Springboot java introduces Mqtt to receive and send messages under the framework of the front and back separation version

This is just one of them, and it is a superficial method of receiving and sending messages.
The synchronization mechanism needs to be communicated and confirmed with colleagues engaged in the Internet of Things to see if it can be implemented. Or if there are many devices, the synchronization mechanism will not be used.
First, the pom file introduces dependencies

 <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

Secondly, the mqtt configuration file is configured. This is yml. The writing method of other configuration files needs to be changed

mqtt:
    username: ****** # Username
    password: ****** # Password
    hostUrl: tcp://******:1883 # tcp://ip:port
    clientId: clientId # client id
    defaultTopic: electric/#,test # Subscription topic electric/# means that topics starting with electric/ can be accepted
    timeout: 100 # Timeout time (unit: seconds)
    keepalive: 60 #Heartbeat (unit: seconds)
    enabled: true # Whether to use mqtt function

Then comes the code level.
First create an entity class MqttConfig of the yml file
prefix = This address depends on your own configuration
@ConfigurationProperties(prefix = “mqtt”)


import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * username
     */
    private String username;
    /**
     * password
     */
    private String password;
    /**
     *Connection address
     */
    private String hostUrl;
    /**
     *Customer ID
     */
    private String clientId;
    /**
     *Default connection topic
     */
    private String defaultTopic;
    /**
     * overtime time
     */
    private int timeout;
    /**
     * Keep the number of connections
     */
    private int keepalive;
    /**
     * mqtt function enabled
     */
    private boolean enabled;
    private boolean retained;
    /**
     * qos
     */
    private int qos;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getHostUrl() {
        return hostUrl;
    }

    public void setHostUrl(String hostUrl) {
        this.hostUrl = hostUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getDefaultTopic() {
        return defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
    public int getQos() {
        return qos;
    }

    public void setQos(int qos) {
        this.qos = qos;
    }



    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//Connect
            for(int i=0; i<mqtt_topic.length; i + + ){
                mqttPushClient.subscribe(mqtt_topic[i], 0);//Subscribe to the topic
            }
        }
        return mqttPushClient;
    }
}

**Create the MqttPushClient file here
The functions of linking the client, sending messages, and subscribing to topics are all here.
**

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * Client connection
     *
     * @param host ip + port
     * @param clientID clientId
     * @param username username
     * @param password password
     * @param timeout timeout time
     * @param keepalive number to keep
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * make an announcement
     *
     * @param pubTopic topic
     * @param message content
     * @param qos connection method
     */
    public static void publishMessage(String pubTopic, String message, int qos) {
            System.out.println("Publish message " + client.isConnected());
            System.out.println("id:" + client.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = client.getTopic(pubTopic);

            if(null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {
                        logger.info("Publish message successfully");
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    }

    /**
     * Subscribe to a topic
     *
     * @param topic topic
     * @param qos connection method
     */
    public static void subscribe(String topic, int qos) {
        logger.info("Start subscribing to topic" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

Create another interface that inherits the callback method PushCallback

package com.ruoyi.util.mqttUtil;

import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        // After the connection is lost, reconnection is usually performed here.
        logger.info("The connection is disconnected, you can reconnect");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        //The message obtained after subscribe will be executed here
        logger.info("Receive message topic: " + topic);
        logger.info("Receive message Qos: " + mqttMessage.getQos());
        logger.info("Receive message content: " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = mqttMessage.getQos() + "";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("Publish message successfully");
        //This will be called only after the message is published successfully. You can take a closer look at the token. The subsequent synchronization mechanism also uses this token to complete.
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    //Other Controller layers will call this method to obtain the received hardware data
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }

}

Now you need to download MQTTX and send and receive messages directly to the server

Freely fill in the red box in the picture below. Use the server address, port, user name, and password in the java code configuration file

The next step is to connect to add a subscription. Remember that this subscription must be in your configuration file. Any name is OK /# Equivalent to fuzzy query

Okay, you can start the project here. The console will print the topics we subscribe to, which means that messages sent to us by these topics will be directly accepted by us

Start the project Since we have subscribed to the topic test in our configuration file, I directly send information to the topic test in mqttx

What is the framed place is to send a message to that topic

The console automatically prints subscribed test topic information

**Once you get here, it’s enough to accept the message. Now it’s time to send the message.
Just find any controller and make a request **

 @RequestMapping("/send")
    @ResponseBody
    private ResponseEntity<String> send() throws MqttException {
       System.out.println("I am the data sent by springboot");
       //Three parameters: the first is the topic, the second is the content to send, and the third is qos
        MqttPushClient.publishMessage("clientId1","-===============",1);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }

It has been shared, it is just a very basic application. In the next few days, if the project needs it, we will continue to improve the follow-up of synchronizing mqtt requests. If you cannot receive the message, be sure to check whether the subscribed topics correspond to it