Flink does not have a native mqtt data source, but you can add mqtt data sources through custom data sources.
package com.agioe.flink.source.mqtt; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MqttSource extends RichSourceFunction<String> { //storage service private static MqttClient client; //Storage subscription topic private static MqttTopic mqttTopic; //Blocking queue stores subscribed messages private BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); private StartupParams startupParams; public MqttSource(StartupParams startupParams) { this.startupParams = startupParams; } //Method to wrap the connection private void connect() throws MqttException { String username = startupParams.pro.getProperty("mqtt.username"); String password = startupParams.pro.getProperty("mqtt.password"); //Configure connection parameters MqttConfig mqttConfigBean = new MqttConfig(null, null, "tcp://192.168.15.13:1883", "mqtt-client", "testtopic/#"); //Connect to mqtt server client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence()); MqttConnectionOptions options = new MqttConnectionOptions(); options.setUserName(mqttConfigBean.getUsername()); options.setPassword(mqttConfigBean.getPassword() == null ? null : mqttConfigBean.getPassword().getBytes(StandardCharsets.UTF_8)); // options.setCleanSession(false); //Whether to clear the session // options.setSessionExpiryInterval(); //Set the timeout options.setConnectionTimeout(30); //Set session heartbeat time options.setKeepAliveInterval(20); try { String[] msgtopic = mqttConfigBean.getMsgTopic(); //Subscribe to messages int[] qos = new int[msgtopic.length]; for (int i = 0; i < msgtopic.length; i + + ) { qos[i] = 0; } client.setCallback(new MsgCallback(client, options, msgtopic, qos) { }); client.connect(options); client.subscribe(msgtopic, qos); System.out.println("MQTT connection successful:" + mqttConfigBean.getClientId() + ":" + client); } catch (Exception e) { System.out.println("MQTT connection exception:" + e); } } //Implement MqttCallback, internal functions can be called back class MsgCallback implements MqttCallback { private MqttClient client; private MqttConnectionOptions options; private String[] topic; private int[] qos; public MsgCallback() { } public MsgCallback(MqttClient client, MqttConnectionOptions options, String[] topic, int[] qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } //Connection failure callback function @Override public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { System.out.println("MQTT connection is disconnected, initiate reconnection"); while (true) { try { Thread.sleep(1000); client.connect(options); //Subscribe to messages client.subscribe(topic, qos); System.out.println("MQTT reconnection successful:" + client); break; } catch (Exception e) { e.printStackTrace(); continue; } } } @Override public void mqttErrorOccurred(MqttException e) { } //Call back this function after receiving the message @Override public void messageArrived(String s, MqttMessage message) throws Exception { System.out.println(); //Subscribe message characters String msg = new String(message.getPayload()); byte[] bymsg = getBytesFromObject(msg); System.out.println("topic:" + topic); queue.put(msg); } @Override public void deliveryComplete(IMqttToken iMqttToken) { } @Override public void connectComplete(boolean b, String s) { System.out.println("MQTT reconnection successful:" + client); } @Override public void authPacketArrived(int i, MqttProperties mqttProperties) { } //Convert object to bytecode public byte[] getBytesFromObject(Serializable obj) throws Exception { if (obj == null) { return null; } ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); return bo.toByteArray(); } } //flink thread startup function @Override public void run(final SourceContext<String> ctx) throws Exception { connect(); //Use an infinite loop to make the program always monitor whether there are new messages on the topic while (true) { //The advantage of using a blocking queue is that when the queue is empty, the program will block until here without wasting CPU resources. ctx.collect(queue.take()); } } @Override public void cancel() { } /** * Subscribe to a topic * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { System.out.println("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } public MqttClient getClient() { return client; } public void setClient(MqttClient client) { this.client = client; } public MqttTopic getMqttTopic() { return mqttTopic; } public void setMqttTopic(MqttTopic mqttTopic) { this.mqttTopic = mqttTopic; } }
public class MqttConfig implements Serializable { public MqttConfig(String username, String password, String hostUrl, String clientId, String msgTopic) { this.username = username; this.password = password; this.hostUrl = hostUrl; this.clientId = clientId; this.msgTopic = msgTopic; } //Connection name private String username; //Connection password private String password; //ip address and port number private String hostUrl; //The server ID must not be repeated with other connections, otherwise the connection will fail. private String clientId; //Subscribed topic private String msgTopic; //Get username public String getUsername() { return username; } //get password public String getPassword() { return password; } //Get client id public String getClientId() { return clientId; } //Get the server url public String getHostUrl() { return hostUrl; } //Get subscription public String[] getMsgTopic() { String[] topic = msgTopic.split(","); return topic; } }
Add in the main function of flink
stream = env.addSource(new MqttSource(StartupParams.instance));