FluxMQ-Version 2.0 Update Content Preface FLuxMQ is a cloud-native distributed IoT access platform developed based on java and supporting unlimited device connections. FluxMQ is developed based on Netty, and the bottom layer adopts the Reactor3 reactor model, which has low latency

Foreword

FLuxMQ is a cloud-native distributed IoT access platform developed based on java and supporting unlimited device connections. FluxMQ is developed based on Netty, and the bottom layer adopts the Reactor3 reactor model, which has low latency, high throughput, and tens of millions or billions of device connections; it is convenient for enterprises to quickly build their IoT platforms and applications.

FluxMQ official website: https://www.fluxmq.com FluxMQ demo system: http://demo.fluxmq.com/

Description of changes

Function Description
Self-developed EventBus communication component Provide large-scale data routing compression and other functions
Distributed publish-subscribe matching tree Provide faster Topic routing
Session messages Support web management session messages, data persistence
retain messages Support Web management retention messages, data persistence
Delay messages Support Web management delay messages, data persistence
Configuration Persistence Web Configuration Page Configuration Persistence
Rule Engine New LOG data source , the data is written into an independent LOG file
rule engine Newly added JSON function, the user processes the nested JSON data format
rule engine Add protocol extension data format, users can forward third-party extension protocol uniformly
multi-protocol module Expand multi-protocol modules based on the FluxMQ MQTT protocol, providing the same connection subscription management capabilities

Self-developed EventBus communication components

R&D background

?

Remove Ignite communication components, use self-developed cluster communication, and combine distributed subscription tree to achieve a significant performance improvement

?

Flux1.0 uses Ignite’s message API for data routing. This method has the following disadvantages:

  1. Inter-cluster communication adopts broadcast mode, and the communication performance in large clusters is extremely low

  2. Wildcard mode is not supported, and wildcard routing cannot be resolved

  3. Unable to achieve cluster consumption capacity

  4. The dependence on Ignite is very heavy, which leads to high cost of troubleshooting and cannot be replaced

EventBus features

Cluster communication

Based on TCP components (UDP multicast and other functions will be expanded later), the interconnection between clusters is enabled, and the server realizes port occupation scanning startup: the default port is 48880, and if the port is occupied, it will increase in turn. The maximum port number is: 49000. After the node is started, the client automatically scans the port according to the cluster IP configured in the configuration file (48880->49000), and at the same time, the client maintains the heartbeat with the server. Avoid node downtime.

Message frame

Fixed header (1 byte) Topic length (1 byte) Topic(n byte) Topic(n byte) Body length (2 byte) Body (n byte)
message type 2bit Qos 2bit Whether to compress 1bit Whether to batch 1bit reserve bit 2bit 9 test/test 11 HELLO, WORLD

Batch compression

?

FluxMQ will automatically calculate TPS for cluster message routing. When the single-node TPS exceeds 2000, it will automatically start the batch compression function to improve inter-cluster transmission performance (the batch compression function can be manually turned off if the delay is extremely high).

?

Distributed publish-subscribe matching tree

A Root-level subscription is maintained between FluxMQ cluster nodes, and there are two types of subscriptions:

  • local subscription

  • Remote subscription In order to search the matching tree as quickly as possible, the subscription information will be maintained in each node. When the push topic is matched by the matching tree, the local subscription will directly write the message, and the remote subscription will be transmitted to the remote node through the EventBus system.

Data management

?

Version 2.0 adds delayed messages and session messages, and version 1.0 also has reserved messages, but the management page does not implement visual management. Below we describe some of the changes

?

Conversation message

Distributed session messages are provided, session messages are stored persistently during the session, the cluster is restarted after downtime, the data is not lost, and the data is reloaded after restarting the cluster

Hold messages

According to Topic reserved messages, only one message is reserved for each TOPIC. When the transmitted MQTT payload is empty, the reserved messages are cleared. Data persistent storage, after restarting the cluster, the data is reloaded

Delay message

?

FluxMQ provides the ability to issue large batches of Topic instructions at regular intervals. A single machine supports the issuance of millions of delayed message instructions. In cluster mode, after receiving delayed instructions, FluxMQ will automatically load them to the execution node for execution. When the execution node goes down, Tasks not executed by this node will be automatically continued by other nodes, providing the ability to coordinate distributed tasks

?

Delay topic format:

$DELAY/delay seconds refers to/TOPIC

Configuration Persistence

Based on Ignite, the configuration data area is persisted. The current persistent data content is as follows:

Data area Whether to enable persistence
Data source configuration
rule configuration
ACL configuration
System configuration
Keep message
Session Message
Delayed Message
rule engine
cloud client
Protocol Extension

Rule Engine

LOG file printing

?

This function can be used to debug messages and generate independent log files on each node of the cluster to quickly locate problems

?

Database SQL template supports Json function’

data input:

{
    "msg": {
      "id": "id",
      "body": {
        "state": 1,
        "no": 2
      }
    },
    "messageId": 1,
    "topic": "test",
    "qos": 1,
    "retain": false,
    "time": "2022 12-22 12:00:00",
    "clientId": "A1212313"
}

At this point, I just want to insert the body structure under the msg content. The following is a general insert SQL statement template:

insert into table (clientId,topic,msg) values ('${clientId}','${topic}','${json(msg.body)}')

Use json (variable name) to convert the structure into a json string and replace it with the value of the inserted field

Multi-protocol module

At present, FluxMQ has built-in COAP, WEBSOCKET, and I1 protocol components, which can be started by specifying a port. After startup, it can interact with protocol components through MQTT. Each client must access according to the FluxMQ standard. The extension protocol shares the following components with FluxMQ’s MQTT:

  • authentication module

  • rule engine

  • connection management

  • log management

  • monitoring management

Upstream instructions

?

Selection of extended protocol data types via rule engine configuration

?

select * from "$EVENT. EXTENSION"

The transmitted data format is as follows:

{
    "protocol": "I1",
    "cmd": "PUBLISH",
    "messageId": 0,
    "time": "2023-07-11 21:59:23",
    "clientId": "clientId",
    "nodeIp": "127.0.0.1",
    "clientIp": "127.0.0.1:19999",
    "body": "body"
}
field description
protocol Protocol name
cmd Command type
– PUBLISH push message
– CONNECT connection
– CLOSE disconnect
messageId message id
time Time
clientIp client address
nodeIp cluster Node IP
body message, if the transmission is JSON, it will be automatically converted into JSON format, otherwise UTF8 string processing will be unified

Add a message forwarding WEBSOCKET protocol

The SQL is as follows:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET'"

Add a message to forward the WEBSOCKET reporting protocol

The SQL is as follows:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET' AND cmd ='PUBLISH'

Download command

Send the FluxMQ cluster command through the MQTT client, and the command can be written to the extended protocol client. The format is as follows:

$PROTOCOL/protocol name/{clientId}

Connection management

Start the WEBSOCKET protocol plug-in

WEBSOCKET client connection

ws://123.249.9.130:7777/test

Connection management