flink accesses mqtt data source

Flink does not have a native mqtt data source, but you can add mqtt data sources through custom data sources.

package com.agioe.flink.source.mqtt;
 
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

 
public class MqttSource extends RichSourceFunction<String> {
    //storage service
    private static MqttClient client;
    //Storage subscription topic
    private static MqttTopic mqttTopic;
    //Blocking queue stores subscribed messages
    private BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    private StartupParams startupParams;

    public MqttSource(StartupParams startupParams) {
        this.startupParams = startupParams;
    }

    //Method to wrap the connection
    private void connect() throws MqttException {
        String username = startupParams.pro.getProperty("mqtt.username");
        String password = startupParams.pro.getProperty("mqtt.password");


        //Configure connection parameters
        MqttConfig mqttConfigBean = new MqttConfig(null, null, "tcp://192.168.15.13:1883", "mqtt-client", "testtopic/#");
        //Connect to mqtt server
        client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
        MqttConnectionOptions options = new MqttConnectionOptions();

        options.setUserName(mqttConfigBean.getUsername());
        options.setPassword(mqttConfigBean.getPassword() == null ? null : mqttConfigBean.getPassword().getBytes(StandardCharsets.UTF_8));
// options.setCleanSession(false); //Whether to clear the session
// options.setSessionExpiryInterval();
        //Set the timeout
        options.setConnectionTimeout(30);
        //Set session heartbeat time
        options.setKeepAliveInterval(20);
        try {
            String[] msgtopic = mqttConfigBean.getMsgTopic();
            //Subscribe to messages
            int[] qos = new int[msgtopic.length];
            for (int i = 0; i < msgtopic.length; i + + ) {
                qos[i] = 0;
            }
            client.setCallback(new MsgCallback(client, options, msgtopic, qos) {
            });
            client.connect(options);
            client.subscribe(msgtopic, qos);
            System.out.println("MQTT connection successful:" + mqttConfigBean.getClientId() + ":" + client);
        } catch (Exception e) {
            System.out.println("MQTT connection exception:" + e);
        }
    }

    //Implement MqttCallback, internal functions can be called back
    class MsgCallback implements MqttCallback {
        private MqttClient client;
        private MqttConnectionOptions options;
        private String[] topic;
        private int[] qos;

        public MsgCallback() {
        }

        public MsgCallback(MqttClient client, MqttConnectionOptions options, String[] topic, int[] qos) {
            this.client = client;
            this.options = options;
            this.topic = topic;
            this.qos = qos;
        }

        //Connection failure callback function
        @Override
        public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
            System.out.println("MQTT connection is disconnected, initiate reconnection");
            while (true) {
                try {
                    Thread.sleep(1000);
                    client.connect(options);
                    //Subscribe to messages
                    client.subscribe(topic, qos);
                    System.out.println("MQTT reconnection successful:" + client);
                    break;
                } catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }
            }
        }

        @Override
        public void mqttErrorOccurred(MqttException e) {

        }

        //Call back this function after receiving the message
        @Override
        public void messageArrived(String s, MqttMessage message) throws Exception {
            System.out.println();
            //Subscribe message characters
            String msg = new String(message.getPayload());
            byte[] bymsg = getBytesFromObject(msg);
            System.out.println("topic:" + topic);
            queue.put(msg);

        }

        @Override
        public void deliveryComplete(IMqttToken iMqttToken) {
        }

        @Override
        public void connectComplete(boolean b, String s) {
            System.out.println("MQTT reconnection successful:" + client);
        }

        @Override
        public void authPacketArrived(int i, MqttProperties mqttProperties) {

        }

        //Convert object to bytecode
        public byte[] getBytesFromObject(Serializable obj) throws Exception {
            if (obj == null) {
                return null;
            }
            ByteArrayOutputStream bo = new ByteArrayOutputStream();
            ObjectOutputStream oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            return bo.toByteArray();
        }
    }

    //flink thread startup function
    @Override
    public void run(final SourceContext<String> ctx) throws Exception {
        connect();
        //Use an infinite loop to make the program always monitor whether there are new messages on the topic
        while (true) {
            //The advantage of using a blocking queue is that when the queue is empty, the program will block until here without wasting CPU resources.
            ctx.collect(queue.take());
        }
    }


    @Override
    public void cancel() {

    }

    /**
     * Subscribe to a topic
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            System.out.println("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public MqttClient getClient() {
        return client;
    }

    public void setClient(MqttClient client) {
        this.client = client;
    }

    public MqttTopic getMqttTopic() {
        return mqttTopic;
    }

    public void setMqttTopic(MqttTopic mqttTopic) {
        this.mqttTopic = mqttTopic;
    }


}
public class MqttConfig implements Serializable {

    public MqttConfig(String username, String password, String hostUrl, String clientId, String msgTopic) {
        this.username = username;
        this.password = password;
        this.hostUrl = hostUrl;
        this.clientId = clientId;
        this.msgTopic = msgTopic;
    }
    //Connection name
    private String username;
    //Connection password
    private String password;
    //ip address and port number
    private String hostUrl;
    //The server ID must not be repeated with other connections, otherwise the connection will fail.
    private String clientId;
    //Subscribed topic
    private String msgTopic;


    //Get username
    public String getUsername() {
        return username;
    }
    //get password
    public String getPassword() {
        return password;
    }
    //Get client id
    public String getClientId() {
        return clientId;
    }
    //Get the server url
    public String getHostUrl() {
        return hostUrl;
    }
    //Get subscription
    public String[] getMsgTopic() {
        String[] topic = msgTopic.split(",");
        return topic;
    }

}

Add in the main function of flink

 stream = env.addSource(new MqttSource(StartupParams.instance));