Implement configurable and scalable IOT services based on Vertx

The goal of building the framework

I believe that partners who have written IOT services should know that faced with various strange communication protocols, especially the hexadecimal message protocols, some protocols are indeed a bit troublesome to read. But there are also many commonalities among these protocols, and there is no need to write out some business-irrelevant code again for each protocol.

The purpose of building this project is to abstract and standardize common device communication protocols based on TCP connections to reduce some development costs. The goal is to implement a configurable framework that facilitates the expansion of various protocols.

Introduction to Vertx

Vert.x is an open source project under the Eclipse Foundation. The basic positioning of Vert.x is an event-driven programming framework. Through Vert.x users can enjoy the high performance brought by NIO at a relatively low cost. Netty is the communication component used at the bottom of Vert.

The main insight of this article is to build a configurable and scalable IOT service. It will not explain Vertx in detail. Vertx related content can be viewed on the official website “vertx official website”

Common concepts in IoT communication

1.logicAddress

Logical communication address. In common device protocols, there is the concept of logical communication address, which is used to identify the current connection to a specific device. With this logical address, you can easily find this connection.

Use this address when creating a file in the business system. You can also use this communication address to issue specified commands to the specified device later.

2.messageType

Message type. In TCP communication, the device reports more than one type of message, but in common device communication protocols, different messages are marked differently to distinguish the meaning of each uploaded message. Therefore, in the message reported by the device, we find the identification bit of the message according to the definition of the communication protocol, and then perform corresponding processing.

3.session

Conversation. session is mainly used to manage connections. After the device and the server establish a connection, a socket will be generated, but many times this socket lacks some semantics and description, so we will make some packaging for this socket, such as abstracting some methods and binding the logical address of the device for subsequent search and call.

Code architecture process

The overall core process is as follows:

Core code analysis

yaml file configuration

Configure the protocol names and protocol communication port numbers of multiple protocols. Multiple ports are used to distinguish different protocols to avoid parsing errors when the protocol contents are similar.

protocols:
  - name: ZHONGXING #中兴
    port: 8898
  - name: HUAWEI # Huawei
    port: 9666

ProtocolServerBootstrap

The main thing here is to load the yaml configuration, then start the corresponding TcpServer service listening port, find the codec of the corresponding protocol according to the configuration definition, and forward the message to the corresponding codec.

Two custom annotations are used here:

@CodecScan: Identifies which packets the codec should scan.

@Protocol: annotation to identify the communication protocol corresponding to the codec.

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
public class ProtocolServerBootstrap extends AbstractVerticle {

    private Class<?> starter;
    private static Map<String, ProtocolConfig> protocols = new ConcurrentHashMap<>();
    private static Map<String, AbstractProtocolCodec> codecMap = new ConcurrentHashMap<>();

    public ProtocolServerBootstrap(Class<?> starter) {
        this.starter = starter;
    }

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        loadProfile();
        loadProtocolCodec();
    }

    public void loadProfile() {
        InputStream inputStream = null;
        try {
            inputStream = this.getClass().getClassLoader().getResourceAsStream("protocol.yml");
            Yaml yaml = new Yaml();
            Map<String, List<Object>> map = yaml.load(inputStream);
            List<Object> protocolConfigs = map.get("protocols");
            String host = NetUtil.getLocalhost().getHostAddress();
            protocolConfigs.stream()
                    .map(item -> JSONUtil.toBean(JSONUtil.toJsonStr(item), ProtocolConfig.class))
                    .forEach(config -> {
                        protocols.put(config.getName(), new ProtocolConfig().setName(config.getName()).setHost(host).setPort(config.getPort()));
                    });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Configuration file parsing failed:" + e);
        } finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void loadProtocolCodec() {
        try {
            CodecScan codecScan = starter.getAnnotation(CodecScan.class);
            if(codecScan == null){
                this.protocols.clear();
                return;
            }
            String[] packages = codecScan.value();
            for (String p : packages) {
                Reflections reflection = new Reflections(p);
                Set<Class<?>> classes = reflection.getTypesAnnotatedWith(Protocol.class);
                for (Class<?> aClass : classes) {
                    Protocol annotation = aClass.getAnnotation(Protocol.class);
                    codecMap.put(annotation.value(), (AbstractProtocolCodec) aClass.newInstance());
                    log.info("Loading codec: " + aClass.getName());
                }
            }
        } catch (Exception e) {
            log.error("Loading codec failed:" + e);
        }
    }

    @Override
    public void start() {
        protocols.forEach((name, protocol) -> {
            AbstractProtocolCodec codec = codecMap.get(name);
            vertx.deployVerticle(codec);
            SocketAddress address = new SocketAddressImpl(protocol.getPort(), protocol.getHost());
            NetServer server = vertx.createNetServer();
            server.connectHandler(codec)
                    .listen(address)
                    .onComplete(res -> {
                        if (res.succeeded()) {
                            log.info("{} service started successfully, bound/{}", protocol.getName(), address);
                        } else {
                            if (res.cause() != null) {
                                log.error("Service startup failed, cause:" + res.cause());
                            }
                        }
                    });
        });
    }

    public AbstractProtocolCodec getProtocolCodec(String protocolName) {
        return codecMap.get(protocolName);
    }
}

AbstractProtocolCodec

The abstract codec class mainly includes monitoring messages received by the server, session management, management processor, etc.

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
public abstract class AbstractProtocolCodec extends AbstractVerticle implements Handler<NetSocket> {

    private Map<String, BaseSession> logicAddressSessionMap = new ConcurrentHashMap<>();
    private Map<NetSocket, BaseSession> socketSessionMap = new ConcurrentHashMap<>();

    private Map<String, AbstractProtocolHandler> handlerMap = new ConcurrentHashMap<>();

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        vertx.eventBus().registerDefaultCodec(BaseMessage.class, new GenericMessageCodec<BaseMessage>() {
        });
        vertx.eventBus().registerDefaultCodec(BaseSession.class, new GenericMessageCodec<BaseSession>() {
        });
        registerHandlers();
    }

    @Override
    public void handle(NetSocket socket) {
        log.info("New connection received: " + socket);
        activeSocket(socket);
        socket.closeHandler(handler -> {
            log.info("Connection disconnected: " + socket);
            afterCloseSocket(socket);
            removeSession(socket);
        });
        socket.handler(data -> {
            try {
                BaseMessage message = new BaseMessage().setSocket(socket).setBuffer(data);
                if(!socketSessionMap.containsKey(socket)){
                    String logicAddress = getLogicAddress(message);
                    registerSession(logicAddress, socket);
                }
                decode(message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("Decoding processing failed, throw:" + e);
            }
        });
    }

    private BaseSession registerSession(String logicAddress, NetSocket socket) {
        BaseSession session = new BaseSession().setLogicAddress(logicAddress).setSocket(socket);
        logicAddressSessionMap.put(logicAddress, session);
        socketSessionMap.put(socket, session);
        return session;
    }

    private void removeSession(NetSocket socket) {
        BaseSession session = socketSessionMap.remove(socket);
        if(session != null){
            logicAddressSessionMap.remove(session.getLogicAddress());
        }
    }

    public BaseSession getSessionByLogicAddress(String logicAddress) {
        return logicAddressSessionMap.get(logicAddress);
    }

    protected abstract List<AbstractProtocolHandler> getHandlers();

    private void registerHandlers() {
        List<AbstractProtocolHandler> handlers = getHandlers();
        handlers.forEach(handler -> {
            handlerMap.put(handler.getMessageType(), handler);
            vertx.deployVerticle(handler);
        });
    }

    public AbstractProtocolHandler getHandlerByMessageType(String messageType) {
        return handlerMap.get(messageType);
    }

    protected abstract void decode(BaseMessage message);


    protected abstract String getLogicAddress(BaseMessage message);


    protected void activeSocket(NetSocket socket) {

    }

    protected void afterCloseSocket(NetSocket socket) {

    }
}

AbstractProtocolHandler

Processor abstract class

/**
 * @author yan
 * @date 2023/9/14
 */
public abstract class AbstractProtocolHandler<T, R> extends AbstractVerticle implements Handler<Message<T>>, InvokeHandler<R> {

    @Override
    public void start() throws Exception {
        vertx.eventBus().consumer(getTopic(), this::handle);
    }

    protected abstract String getTopic();

    protected abstract String getMessageType();

    @Override
    public void write(BaseSession session, Buffer buffer) {
        session.getSocket().write(buffer);
    }
}

InvokeHandler

/**
 * @author yan
 * @date 2023/9/14
 */
public interface InvokeHandler<T> {

    /**
     * Get the buffer based on the incoming parameters
     * @param req
     * @return
     */
    Buffer getBuffer(T req);

    /**
     * Send message
     * @param session
     * @param buffer
     */
    void write(BaseSession session, Buffer buffer);
}

InvokeAdapter

/**
 * Service call adapter
 *
 * @author yan
 * @date 2023/9/14
 */
public class InvokeAdapter {

    private ProtocolServerBootstrap bootstrap;

    public InvokeAdapter(ProtocolServerBootstrap bootstrap){
        this.bootstrap = bootstrap;
    }

    public void send(String protocolName, String logicAddress, String messageType, Object param) {
        AbstractProtocolCodec codec = bootstrap.getProtocolCodec(protocolName);
        BaseSession session = codec.getSessionByLogicAddress(logicAddress);
        if (session == null || session.getSocket() == null) {
            throw new RuntimeException("session is not exist or closed");
        }
        AbstractProtocolHandler handler = codec.getHandlerByMessageType(messageType);
        Buffer buffer = handler.getBuffer(param);
        handler.write(session, buffer);
    }

}

Huawei protocol implementation

HuaweiCodec

Huawei protocol codec

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
@Protocol("HUAWEI")
public class HuaweiCodec extends AbstractProtocolCodec {

    @Override
    protected List<AbstractProtocolHandler> getHandlers() {
        return Arrays.asList(new HuaweiParamReadHandler(), new HuaweiParamWriteHandler(), new HuaweiParamWriteBatchHandler());
    }

    @Override
    protected void decode(BaseMessage message) {
        String dataStr = ByteUtils.hexToHexString(message.getBuffer().getBytes());
        String messageType = dataStr.substring(14, 16);
        vertx.eventBus().publish(HuaweiMessageTypeConstants.getMessageTopic(messageType), message);
    }

    @Override
    protected String getLogicAddress(BaseMessage message) {
        // Here the corresponding communication address is parsed based on the message
        return "001";
    }
}

Handler processor

/**
 * @author yan
 * @date 2023/9/13
 */
@Slf4j
public class HuaweiParamReadHandler extends AbstractProtocolHandler<BaseMessage, Object> {

    @Override
    protected String getTopic() {
        return HuaweiMessageTypeConstants.READ;
    }

    @Override
    public void handle(Message<BaseMessage> message) {
        BaseMessage baseMessage = message.body();
        log.info("received parameter read command return: " + ByteUtils.hexToHexString(baseMessage.getBuffer().getBytes()));
        baseMessage.getSocket().write(baseMessage.getBuffer());
    }

    @Override
    public String getMessageType() {
        return HuaweiMessageTypeConstants.READ;
    }

    @Override
    public Buffer getBuffer(Object req) {
        log.info("Send read message: " + req);
        return Buffer.buffer(new byte[]{0x11, 0x11, 0x11});
    }
}

Test call

Write the main class

/**
 * @author yan
 * @date 2023/9/11
 */
@CodecScan("com.cdw.pv.iot.modules")
public class PvApplication {

    public static void main(String[] args) {
        ProtocolServerBootstrap bootstrap = new ProtocolServerBootstrap(PvApplication.class);
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(bootstrap);
        // Start an http service and simulate external calls
        startHttpServer(vertx, bootstrap);
    }

    private static void startHttpServer(Vertx vertx, ProtocolServerBootstrap bootstrap) {
        InvokeAdapter adapter = new InvokeAdapter(bootstrap);
        HttpServer httpServer = vertx.createHttpServer();
        httpServer.requestHandler(handler -> {
            System.out.println("request request: " + handler);
            adapter.send("HUAWEI", "001", HuaweiMessageTypeConstants.READ, "123");
            handler.response().end(Buffer.buffer("success"));
        }).listen(8899).onComplete(handler -> {
            if (handler.succeeded()) {
                System.out.println("http server started successfully");
            }
        });
    }
}

Occurrence command

Use TCP connection to send instructions according to

Service call

Simulate sending request

Received the news

Summary

The above is the overall architecture of the general IOT service.

This is just a basic version and can be adjusted later according to the actual situation.