Netty+SpringBoot creates a TCP long connection communication solution

ddd8aafc396e5872f9a080eb3ba24241.png

Project background

Recently, an Internet of Things project in the company needed to use socket long connections for message communication. I tinkered with a version of the code and put it online. As a result, there were constant bugs. I had trouble sleeping and eating, so I asked Du Niang for help. The project finally ran smoothly after not sleeping for several days. In the spirit of open source In the spirit of sharing, I have refined the project code into a demo project, trying my best to abandon the ugly business part, hoping to learn and make progress together with my classmates.

Text

1. Project structure

This project uses netty, redis and springboot2.2.0

2. Project module

The directory structure of this project is as follows:

ce94fd4aea82e37cea9536383c1f412d.png

netty-tcp-core is a public module, mainly a tool class. netty-tcp-server is the netty server. The server is only used for testing. In the actual project, we only use the client. netty-tcp-client is the client and the focus of this article.

3. Business process

We use RocketMQ as the message queue in our actual project. Since this project is a demo project, it was changed to BlockingQueue. The data flow is:

Producer->Message queue->Consumer (client)->tcp channel->Server->tcp channel->Client.

When the consumer receives a message sent by a device, it will determine whether there is a connection between the device and the server in the cache. If it exists and the channel is active, it will use the channel to send the message. If it does not exist, it will create a channel and send it immediately after the channel is activated. Message, when the client receives a message from the server, it responds to the business process.

4. Detailed code explanation

1. Message Queue

Since this demo project removes the message middleware, you need to create a local queue to simulate real usage scenarios.

package org.example.client;

import org.example.client.model.NettyMsgModel;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * This project uses a local queue for demonstration. In actual production, message middleware should be used instead (rocketmq or rabbitmq)
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder {

    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);

    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

Use a class to hold a static instance of the queue so that it can be quickly referenced in any class. Next we need to start a thread to monitor the message in the queue. Once the message is delivered to the queue, we take out the message and process the message asynchronously with multiple threads.

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i + + ) {
            executor.execute(() -> {
                while (true) {
                    //Take the first object in the BlockingQueue. If the BlockingQueue is empty, block and enter the waiting state until
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

Using the take method will block the thread until the queue receives the message and enters the next loop.

2. Execution class

The process method comes from the MessageProcessor class, which is a singleton, but will be executed by multiple threads at the same time.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) { //To avoid repeatedly creating the client after receiving multiple messages from the same device, it must be locked
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { //The previous message is being processed
                log.info("imei={} message processing, requeuing", imei);
                //Put it back into the queue and wait for consumption again, delaying x seconds (in actual projects, rocketmq or rabbitmq should be used to implement delayed consumption)
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={} message processing, requeue completed", imei);
                return;
            } else {
                //If there is no direct lock in the connection
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        //If the message exists in the cache, send the message
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (null != nettyClient.getChannelFuture() & amp; & amp; nettyClient.getChannelFuture().channel().isActive()) { //Send messages directly when the channel is active
                if (!nettyClient.getChannelFuture().channel().isWritable()) {
                    log.warn("Warning, channel cannot be written, imei={}, channelId={}", nettyClient.getImei(),
                            nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={}, channel is inactive and actively closed", nettyClient.getImei());
                nettyClient.close();
                //Recreate the client to send
                this.createClientAndSend(nettyMsgModel);
            }
        } else { //Create a new client if it does not exist in the cache
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        //Unlock after execution
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }

}

Among them, imei is the unique identifier of our device. We can use imei as the cache key to confirm whether a connection has been created. Since the concurrency of our messages may be very large, there may be situations when a connection to a device is being created and another thread receives a message from the device and starts to create a connection, so we use synchronized code blocks and redis distributed locks. to avoid this happening. When a message acquires the lock, subsequent messages will be put back into the message queue and delayed in consumption before the lock is released.

The thread that acquires the lock will determine whether there is a connection in the cache based on imei. If it exists, it will send the message directly. If it does not exist, it will enter the method of creating the client.

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("Creating client execution imei={}", nettyMsgModel.getImei());
    //The DemoClientHandler here can be defined according to your own business
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
            this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient); //Execute client initialization
    try {
        //Use the lock to wait for client activation
        synchronized (nettyClient) {
            long c1 = System.currentTimeMillis();
            nettyClient.wait(5000); //Block for up to 5 seconds. After 5 seconds, the client is still not activated and will be automatically unlocked.
            long c2 = System.currentTimeMillis();
            log.info("Creating client wait takes time={}ms", c2 - c1);
        }
        if (null != nettyClient.getChannelFuture() & amp; & amp; nettyClient.getChannelFuture().channel().isActive()) { //Connection successful
            //Save to cache
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            //Send message after client activation
            nettyClient.send(nettyMsgModel.getMsg());
        } else { //Connection failed
            log.warn("Client creation failed, imei={}", nettyMsgModel.getImei());
            nettyClient.close();
            //You can requeue the message for processing
        }
    } catch (Exception e) {
        log.error("Client initialization message sending exception ===>{}", e.getMessage(), e);
    }
}

When the netty client instance is created, the thread pool is used to perform initialization. Since it is executed asynchronously, if we send a message immediately at this time, it is likely that the client has not completed the connection, so it must be locked and waited. Enter the synchronized code block and use the wait method to wait for the client to activate and unlock. The parameter 5000 is the number of milliseconds for automatic unlocking, which means that if the client encounters an abnormality and fails to successfully connect and activate the channel, unlock , the lock will be automatically unlocked after up to 5000 milliseconds.

This parameter can be adjusted according to the actual situation. In the case of large concurrency, 5 seconds of blocking may cause the thread pool to be exhausted or the memory to overflow. The message will be sent immediately after the client is successfully created and activated.

3. Client
package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {

    @Value("${netty.server.port}")
    private int port;

    @Value("${netty.server.host}")
    private String host;
    //Client unique identifier
    private String imei;
    //Customized business data
    private Map<String, Object> bizData;

    private EventLoopGroup workGroup;

    private Class<BaseClientHandler> clientHandlerClass;

    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }

    @Override
    public void run() {
        try {
            this.init();
            log.info("Client starts imei={}", imei);
        } catch (Exception e) {
            log.error("Client startup failed: {}", e.getMessage(), e);
        }
    }

    public void close() {
        if (null != this.channelFuture) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }

    public void send(String message) {
        try {
            if (!this.channelFuture.channel().isActive()) {
                log.info("Channel inactive imei={}", this.imei);
                return;
            }
            if (!StringUtils.isEmpty(message)) {
                log.info("Queue message sent ===>{}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void init() throws Exception {
        //Pass this instance to handler
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        //2 Construct server/client through auxiliary classes
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\r\
".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); // String decoding.
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); // String decoding.
// // Heartbeat settings
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    }
                });
        this.connect(b);
    }

    private void connect(Bootstrap b) throws InterruptedException {
        long c1 = System.currentTimeMillis();
        final int maxRetries = 2; //Reconnect 2 times
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(
                    new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                if (count.incrementAndGet() > maxRetries) {
                                    log.warn("imei={}reconnected more than {} times", imei, maxRetries);
                                } else {
                                    log.info("imei={}reconnection {} times", imei, count);
                                    b.connect(host, port).addListener(this);
                                }

                            } else {
                                log.info("imei={} connection successful, connection IP: {} connection port: {}", imei, host, port);
                                flag.set(true);
                            }
                        }
                    }).sync(); //Synchronous connection
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("Device imei={}, channelId={} connection time={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
        if (flag.get()) {
            channelFuture.channel().closeFuture().sync(); //The thread will continue to be blocked after the connection is successful.
        }
    }
}

The netty client has multiple instances. Each instance is bound to a thread and continues to block until the client is closed. Each client can save its own business data so that it can handle business use during subsequent interactions with the server. When the client performs the connection, it is given two opportunities to retry. If the connection is not successful three times, it will give up. You can choose to re-queue the message for consumption later. In our actual project, a login message should be sent to the server in advance, and subsequent communication can only be performed after confirmation by the server. This needs to be adjusted according to the actual situation.

Another point to note is that EventLoopGroup is passed in from the constructor rather than created in the client, because when the number of clients is very large, each client creates its own thread group It will consume a lot of server resources. Therefore, in actual use, we create a unified thread group based on the business for common use by all clients under the business. The size of the thread group needs to be flexibly configured according to business needs.

In the init method, we add a handler to the client to handle the interaction with the server. Let’s take a look at the specific implementation.

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author ReWind00
 * @date 2023/2/15 10:09
 */
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {

    private final String imei;

    private final Map<String, Object> bizData;

    private final NettyClient nettyClient;

    private int allIdleCounter = 0;

    private static final int MAX_IDLE_TIMES = 3;

    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client imei={}, channel activation successful", this.imei);
        synchronized (this.nettyClient) { //Unlock the queue thread when the channel is activated, and then send the message
            this.nettyClient.notify();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("Client imei={}, channel disconnected", this.imei);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("Client imei={}, received message: {}", this.imei, msg);
        //Process business...
        if ("shutdown".equals(msg)) {
            this.nettyClient.close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            boolean flag = false;
            if (e.state() == IdleState.ALL_IDLE) {
                this.allIdleCounter + + ;
                log.info("Client imei={} triggers idle reading or writing {} times", this.imei, this.allIdleCounter);
                if (this.allIdleCounter >= MAX_IDLE_TIMES) {
                    flag = true;
                }
            }
            if (flag) {
                log.warn("The read and write timeout reached {} times, the connection was actively disconnected", MAX_IDLE_TIMES);
                ctx.channel().close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("Client imei={}, connection exception {}", imei, cause.getMessage(), cause);
    }
}

DemoClientHandler is also a multi-instance bean. Each instance holds its own NettyClient reference for subsequent processing of specific business. In the channelActive method, we can see that the notify method of the client instance is executed. This is where the wait lock is released after the client is successfully created and the channel is activated. The channelRead method is how we process messages sent by the server. Our specific business should be executed in this method. Of course, it is not recommended to block the client’s working thread for a long time. You can consider asynchronous processing.

Finally we look at the client cache class.

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author ReWind00
 * @date 2023/2/15 11:01
 */
public class NettyClientHolder {

    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();

    public static ConcurrentHashMap<String, NettyClient> get() {
        return clientMap;
    }

}

Since netty’s channel cannot be serialized, it cannot be stored in redis and can only be cached in local memory. Its essence is a ConcurrentHashMap.

Recommended address for fishing:

https://www.yoodb.com/slack-off/home.html

5. Test

package org.example.client.controller;

import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author ReWind00
 * @date 2023/2/15 13:48
 */
@RestController
@RequestMapping("/demo")
public class DemoController {

    /**
     * Send two messages at intervals
     */
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }

    /**
     * Send any message
     *
     * @param imei
     * @param msg
     */
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }

    /**
     * Send two messages in a row. The second message will be put back into the queue to delay consumption due to the redis lock.
     */
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

The test interface code is as above, call testOne, and the log is as follows:

f0f9e1c07730deb33d66872b9d23ece2.png

You can see that the first message triggered the client creation process, and the message was sent after creation, while the second message 5 seconds later was sent directly through the existing channel.

The test interface code is as above, call testTwo, the log is as follows:

a9d309368c570720b666c7d9a4892d7f.png

Sending shutdown can actively disconnect existing connections.

The test interface code is as above, call testThree, and the log is as follows:

999709403d88c1c3c6d61427187e90cc.png

You can see that the second message is requeued and consumed deferred.

6. Source code

https://gitee.com/jaster/netty-tcp-demo

Postscript

This demo project is only for learning and communication. If it is to be applied to a production environment, there are still some deficiencies. Students who have questions can leave messages for communication.

Copyright statement: This article is an original article by CSDN blogger “ReWinD00” and follows the CC 4.0 BY-SA copyright agreement. Please attach the original source link and this statement when reprinting.

https://blog.csdn.net/u013615903/article/details/129044283




If the source of the content published in the public account "Java Selection" is indicated, the copyright belongs to the original source (the content whose copyright cannot be verified or the source is not indicated is from the Internet and is reprinted. The purpose of reprinting is to convey more information. The copyright belongs to the original author. If there is any infringement, please contact us and we will delete it as soon as possible!
Recently, many people have asked if there is a reader exchange group! The method to join is very simple, just select the public account Java and reply "Add group" to join the group! 

Java Selected Interview Questions (WeChat Mini Program): 3000 + interview questions, including Java basics, concurrency, JVM, threads, MQ series, Redis, Spring series, Elasticsearch, Docker, K8s, Flink, Spark, architecture design, etc., answer questions online at any time!

—— Special recommendation ——

Special recommendation: “Big Shot Notes” is a public account that focuses on sharing the most cutting-edge technology and information, preparing for overtaking in corners, and various open source projects and high-efficiency software, focusing on digging for good stuff. It is very worthy of everyone’s attention. Click the official account card below to follow.

Click “Read the original text” to learn more exciting content!

If the article is helpful, please click to read and forward it!