Rocketmq-Mqtt development example

Doker official website: Doker

1. RocketMQ MQTT Overview

The traditional message queue MQ is mainly used for message communication between services (ends), such as transaction messages, payment messages, logistics messages, etc. in the e-commerce field. However, under the general category of messages, there is another very important and common message field, that is, IoT terminal device messages. In recent years, we have seen the explosive growth of IoT device-oriented news arising from smart home and industrial interconnection, and the news on the mobile APP side of the mobile Internet, which has been developed for more than ten years, is still orders of magnitude huge. The order of magnitude of messages for terminal devices is many orders of magnitude larger than that of traditional servers and is still growing rapidly.

If there is a unified message system (product) to provide multi-scenario computing (such as stream, event) and multi-scenario (IoT, APP) access, it is actually very valuable, because messages are also important data. There is only one system, which can minimize storage costs and effectively avoid the consistency problems and challenges caused by data synchronization between different systems.

Based on this, we introduced the RocketMQ-MQTT extension project to realize RocketMQ’s unified access to the messages of IoT devices and servers, and provide integrated message storage and intercommunication capabilities.

1. MQTT protocol

In the IoT terminal scenario, the MQTT protocol is widely used in the industry at present, which is a standard open protocol defined by the OASIS Alliance that originated from the IoT scenario of the Internet of Things. Because there are many types of IoT devices and different operating environments, a standard access protocol is particularly critical.

The MQTT protocol defines a Pub/Sub communication model, which is similar to RocketMQ, but it is more flexible in the way of subscription, and can support multi-level Topic subscriptions (such as “/t/t1/t2”), and can even support Wildcard subscription (such as “/t/t1/ + “).

2. Model introduction

Queue storage model

We have designed a topic queue model for multi-dimensional distribution. As shown in the figure above, messages can come from various access scenarios (such as MQ/AMQP on the server side and MQTT on the client side), but only one copy will be written and stored in the commitlog, and then Distribute the queue index (ConsumerQueue) of multiple demand scenarios. For example, the server-side scenario (MQ/AMQP) can perform traditional server-side consumption according to the first-level Topic queue, and the client-side MQTT scenario can consume according to MQTT multi-level Topic and wildcard subscription. information.

Such a queue model can support the access and message sending and receiving of the server and terminal scenarios at the same time, achieving the goal of integration.

Push-pull model

The above figure shows a push-pull model. The P node in the figure is a protocol gateway or broker plug-in, and the terminal device is connected to the gateway node through the MQTT protocol. Messages can be sent from a variety of scenarios (MQ/AMQP/MQTT). After being stored in the Topic queue, there will be a notify logic module to sense the arrival of the new message in real time, and then a message event (that is, the topic name of the message) will be generated. The event is pushed to the gateway node, and the gateway node performs internal matching according to the subscription status of the connected terminal devices, finds which terminal devices can be matched, and then triggers a pull request to the storage layer to read the message and push it to the terminal device.

3. Architecture overview

Our goal is to achieve an integrated and self-closed loop based on RocketMQ, but we don’t want Broker to be invaded into more scenario logic. We abstract a protocol computing layer, which can be a gateway or a broker plug-in. Broker focuses on solving Queue issues and doing some Queue storage adaptation or transformation to meet the above computing needs. The protocol computing layer is responsible for protocol access and must be pluggable and deployed.

This example is divided into four parts:

  • MqttConsumer.java // MQTT client starts subscription message

  • MqttProducer.java // MQTT client starts publishing messages

  • RocketMQConsumer.java //RocketMQ client starts subscription message

  • RocketMQProducer.java // RocketMQ client starts publishing messages

2. Introduce dependencies

<dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>mqtt-common</artifactId>
        </dependency>
    </dependencies>

3. Mqtt message producer MqttProducer:

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MqttProducer {
    public static void main(String[] args) throws InterruptedException, MqttException, NoSuchAlgorithmException, InvalidKeyException {
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        String brokerUrl = "tcp://" + System.getenv("host") + ":1883";
        String firstTopic = System.getenv("topic");
        String sendClientId = "send01";
        MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
        MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
        mqttClient.setTimeToWait(5000L);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println(sendClientId + " connect success to " + serverURI);
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable. printStackTrace();
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
        try {
            mqttClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            e.printStackTrace();
        }
        long interval = 1000;
        for (int i = 0; i < 1000; i ++ ) {
            String msg = "r1_" + System. currentTimeMillis() + "_" + i;
            MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
            message.setQos(1);
            String mqttSendTopic = firstTopic + "/r1";
            mqttClient.publish(mqttSendTopic, message);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread. sleep(interval);

            mqttSendTopic = firstTopic + "/r/wc";
            msg = "wc_" + System.currentTimeMillis() + "_" + i;
            MqttMessage messageWild = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
            messageWild. setQos(1);
            mqttClient.publish(mqttSendTopic, messageWild);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread. sleep(interval);

            mqttSendTopic = firstTopic + "/r2";
            msg = "msgQ2_" + System.currentTimeMillis() + "_" + i;
            message = new MqttMessage(msg. getBytes(StandardCharsets. UTF_8));
            message.setQos(2);
            mqttClient.publish(mqttSendTopic, message);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread. sleep(interval);
        }
    }

    private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(60);
        connOpts.setAutomaticReconnect(true);
        connOpts.setMaxInflight(10000);
        connOpts.setUserName(System.getenv("username"));
        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());
        return connOpts;
    }

    private static String now() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf. format(new Date()) + "\t";
    }
}

4. Mqtt consumer MqttConsumer

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MqttConsumer {
    public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException {
        String brokerUrl = "tcp://" + System.getenv("host") + ":1883";
        String firstTopic = System.getenv("topic");
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        String recvClientId = "recv01";
        MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(recvClientId);
        MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
        mqttClient.setTimeToWait(5000L);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println(recvClientId + " connect success to " + serverURI);
                try {
                    final String topicFilter[] = {firstTopic + "/r1", firstTopic + "/r/ + ", firstTopic + "/r2"};
                    final int[] qos = {1, 1, 2};
                    mqttClient. subscribe(topicFilter, qos);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable. printStackTrace();
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                try {
                    String payload = new String(mqttMessage. getPayload());
                    String[] ss = payload. split("_");
                    System.out.println(now() + "receive:" + topic + "," + payload);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });

        try {
            mqttClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("connect fail");
        }
    }

    private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(60);
        connOpts.setAutomaticReconnect(true);
        connOpts.setMaxInflight(10000);
        connOpts.setUserName(System.getenv("username"));
        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());
        return connOpts;
    }

    private static String now() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf. format(new Date()) + "\t";
    }
}

5. The RocketMQ client starts publishing messages RocketMQProducer

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

public class RocketMQProducer {
    private static DefaultMQProducer producer;
    private static String firstTopic = System.getenv("firstTopic");
    private static String recvClientId = "recv01";

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        producer = new DefaultMQProducer("PID_TEST");
        // Specify name server addresses.
        producer.setNamesrvAddr(System.getenv("namesrv"));
        //Launch the instance.
        producer.start();

        for (int i = 0; i < 1000; i ++ ) {
            //Create a message instance, specifying topic, tag and message body.

            //Call send message to deliver message to one of brokers.
            try {
                sendMessage(i);
                Thread. sleep(1000);
                sendWithWildcardMessage(i);
                Thread. sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //Shut down once the producer instance is not longer in use.
        producer. shutdown();
    }

    private static void setLmq(Message msg, Set<String> queues) {
        msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                StringUtils. join(
                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet() ),
                        MixAll. MULTI_DISPATCH_QUEUE_SPLITTER));
    }

    private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        Message msg = new Message(firstTopic,
                "MQ2MQTT",
                ("MQ_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
        String secondTopic = "/r1";
        setLmq(msg, new HashSet<>(Arrays. asList(TopicUtils. wrapLmq(firstTopic, secondTopic))));
        SendResult sendResult = producer. send(msg);
        System.out.println(now() + "sendMessage: " + new String(msg.getBody()));
    }

    private static void sendWithWildcardMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        Message msg = new Message(firstTopic,
                "MQ2MQTT",
                ("MQwc_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
        String secondTopic = "/r/wc";
        Set<String> lmqSet = new HashSet<>();
        lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));
        lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));
        setLmq(msg, lmqSet);
        SendResult sendResult = producer. send(msg);
        System.out.println(now() + "sendWcMessage: " + new String(msg.getBody()));
    }

    private static Set<String> mapWildCardLmq(String firstTopic, String secondTopic) {
        // todo by yourself
        return new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, "/r/ + ")));
    }

    private static String now() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf. format(new Date()) + "\t";
    }

}

6. RocketMQ client starts subscription message RocketMQConsumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.mqtt.common.model.Constants;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class RocketMQConsumer {

    public static void main(String[] args) throws MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_test01");

        // Specify name server addresses.
        consumer.setNamesrvAddr(System.getenv("namesrv"));

        // Subscribe one more more topics to consume.
        String firstTopic = System.getenv("firstTopic");
        consumer.subscribe(firstTopic, Constants.MQTT_TAG);
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer. registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs. get(0);
                System.out.println(now() + "Receive: " + new String(messageExt.getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

    private static String now() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf. format(new Date()) + "\t";
    }
}

Hello everyone, I am Sinbad from the Doker brand. Likes and comments are welcome. Your encouragement is the driving force for our continuous updates! Welcome to add WeChat to enter the technical group chat!