iot–mqtt gets data

iot–mqtt gets data

MQTT tool operations

package com.sc.equipmentiot.core.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
 
/**
 *MQTT tool operations
 */
@Slf4j
@Component
public class MQTTConnect {
 
    private String HOST = "tcp://IP:port"; //The address and port number of the mqtt server
    private final String clientId = "DC" + (int) (Math.random() * 100000000);
    private MqttClient mqttClient;
 
    /**
     * Client connect connects to mqtt server
     *
     * @param userName username
     * @param passWord password
     * @param mqttCallback callback function
     **/
    public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(userName, passWord);
        if (mqttCallback == null) {
            mqttClient.setCallback(new Callback());
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
    }
 
    /**
     * MQTT connection parameter settings
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
        mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10); ///Default: 30
        options.setAutomaticReconnect(true);//Default: false
        options.setCleanSession(false);//Default: true
        //options.setKeepAliveInterval(20);//Default: 60
        return options;
    }
 
    /**
     * Close the MQTT connection
     */
    public void close() throws MqttException {
        mqttClient.disconnect();
        mqttClient.close();
    }
 
    /**
     * Publish a message to a topic Default qos: 1
     *
     * @param topic: published topic
     * @param msg: published message
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
 
    /**
     * Post a message to a topic
     *
     * @param topic: published topic
     * @param msg: published message
     * @param qos: Message quality Qos: 0, 1, 2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
 
    /**
     * Subscribe to a topic. The default Qos level of this method is: 1
     *
     * @param topic topic
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }
 
    /**
     * Subscribing to a topic can carry Qos
     *
     * @param topic The topic to be subscribed to
     * @param qos message quality: 0, 1, 2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }
 
    /**
     * The main function is used for testing by yourself
     */
    public static void main(String[] args) throws MqttException {
       MQTTConnect mqttConnect = new MQTTConnect();
       mqttConnect.setMqttClient("Account", "Password", new Callback());
       mqttConnect.sub("topic");
      // mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000)); //Send topic
   }
}

Project start listening topic

package com.sc.equipmentiot.core.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;


/**
 * Project startup monitoring topic
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
 
    private final MQTTConnect server;
 
    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }
 
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.setMqttClient("Account", "Password", new Callback());
            server.sub("topic");
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}
 
 

Regular MQTT callback functions

package com.sc.equipmentiot.core.mqtt;
 
import com.sc.equipmentiot.common.core.redis.RedisCache;
import com.sc.equipmentiot.common.core.text.Convert;
import com.sc.equipmentiot.common.utils.StringUtils;
import com.sc.equipmentiot.core.util.SpringJobBeanFactory;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Regular MQTT callback function
 */
@Slf4j
@Component
public class Callback implements MqttCallback {
 
    /**
     * MQTT disconnection will execute this method
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("Disconnected MQTT connection: {}", throwable.getMessage());
        log.error(throwable.getMessage(), throwable);
    }
 
    /**
     * publish will be executed here after successful release.
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("Publishing message successfully");
    }
 
    /**
     * The message received after subscribe will be executed here
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // TODO Here you can perform business processing and data storage on the subscribed messages.
        log.info("Received message from " + topic + ": {}", new String(message.getPayload()));
    }
}

Notes
1. Be sure to determine whether there is an MQTT environment on the cloud server
2. Be sure to make sure the connected server port is open.
If the appeal code connection error reports that the connection times out or is disconnected, you can first use the MQTT client simulator to connect to the server to eliminate server problems.