TCP long connection communication scheme built with Netty+SpringBoot

77b166fd35829ad589e2d7f9cbea56bd.jpeg

A Programmer’s Path to Growth

Internet/Programmer/Technology/Data Sharing

focus on

It will take about 12 minutes to read this article.

From: blog.csdn.net/u013615903/article/details/129044283

Project Background

Recently, an Internet of Things project of the company needs to use long-term socket connections for message communication, and a version of the code has been fiddled with and launched. As a result, bugs continue, and I can’t sleep or eat, so I ask Du Niang for help. The project has finally run smoothly for several days. In line with open source In the spirit of sharing, Ben Yuan refined the project code into a demo project, discarding the ugly business part as much as possible, hoping to learn and progress together with the students.

Text

1. Project structure

This project uses netty, redis and springboot2.2.0

2. Project module

The directory structure of this project is as follows:

fc017c443359947552f71db5e7723cb2.png

netty-tcp-core is a public module, mainly tools. netty-tcp-server is the netty server. The server is only used for testing. We only use the client in the actual project. netty-tcp-client is the client and the focus of this article.

3. Business process

We use RocketMQ as the message queue in our actual project. Since this project is a demo project, it was changed to BlockingQueue. The data flow is:

Producer -> message queue -> consumer (client) -> tcp channel -> server -> tcp channel -> client.

When a consumer receives a message sent by a device, it will determine whether there is a connection between the device and the server in the cache. If it exists and the channel is active, it will use the channel to send the message. If it does not exist, it will create a channel and send it immediately after the channel is activated. Message, when the client receives a message from the server, it responds to the business process.

Fourth, detailed code explanation

1. Message queue

Since this demo project removes the message middleware, it is necessary to create a local queue to simulate the real usage scenario

package org.example.client;

import org.example.client.model.NettyMsgModel;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * This project uses local queues for demonstration. In actual production, message middleware should be used instead (rocketmq or rabbitmq)
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder {

    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);

    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

Use a class to hold a static instance of the queue for quick reference in any class. Next, we need to start a thread to listen to the messages in the queue. Once the message is delivered to the queue, we will take out the message and process the message asynchronously with multiple threads.

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i ++ ) {
            executor. execute(() -> {
                while (true) {
                    //Take away the first object in the BlockingQueue, if the BlockingQueue is empty, block and enter the waiting state until
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder. get(). take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log. error(e. getMessage(), e);
                    }
                }
            });
        }
    }
}

Using the take method will cause the thread to block until the queue receives the message and enters the next cycle.

2. Execution class

The process method comes from the MessageProcessor class, which is a singleton, but it will be executed by multiple threads at the same time.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel. getImei();
    try {
        synchronized (this) { //In order to avoid repeated client creation after receiving multiple messages from the same device, it must be locked
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { //The last message is being processed
                log.info("imei={}message processing, re-enqueue", imei);
                //Put it back into the queue and wait for consumption again Delay x seconds (in actual projects, rocketmq or rabbitmq should be used to achieve delayed consumption)
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={}message processing, re-queue completed", imei);
                return;
            } else {
                //If there is no direct lock in the connection
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        //Send message if it exists in the cache
        if (NettyClientHolder. get(). containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (null != nettyClient.getChannelFuture() & amp; & amp; nettyClient.getChannelFuture().channel().isActive()) { //The channel is active and sends messages directly
                if (!nettyClient. getChannelFuture(). channel(). isWritable()) {
                    log.warn("Warning, the channel cannot be written, imei={}, channelId={}", nettyClient.getImei(),
                            nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={}, the channel is not active, close", nettyClient.getImei());
                nettyClient. close();
                //Recreate the client send
                this.createClientAndSend(nettyMsgModel);
            }
        } else { //Create a new client if it does not exist in the cache
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log. error(e. getMessage(), e);
    } finally {
        //Unlock after execution
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }

}

Where imei is the unique identifier of our device, we can use imei as the cache key to confirm whether a connection has been created. Since the concurrency of our messages may be very large, there is a situation that when a device connection is being created, another thread receives the device message and starts to create a connection, so we use synchronized code blocks and redis distributed locks to avoid this from happening. When a message acquires a lock, subsequent messages will be put back into the message queue and consumption will be delayed until the lock is released.

The thread that acquires the lock will judge whether there is a connection in the cache according to the imei, if there is a connection, send the message directly, if not, it will enter the method of creating the client.

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("imei={}", nettyMsgModel.getImei());
    //The DemoClientHandler here can be defined according to its own business
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
            this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient); //Execute client initialization
    try {
        / / Use the lock to wait for the client to activate
        synchronized (nettyClient) {
            long c1 = System. currentTimeMillis();
            nettyClient.wait(5000); //Block up to 5 seconds, if the client is still not activated after 5 seconds, it will be automatically unlocked
            long c2 = System. currentTimeMillis();
            log.info("Create client wait time={}ms", c2 - c1);
        }
        if (null != nettyClient.getChannelFuture() & amp; & amp; nettyClient.getChannelFuture().channel().isActive()) { //The connection is successful
            // store in cache
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            //Send a message after the client activates
            nettyClient.send(nettyMsgModel.getMsg());
        } else { //Connection failed
            log.warn("Failed to create client, imei={}", nettyMsgModel.getImei());
            nettyClient. close();
            // You can requeue the message for processing
        }
    } catch (Exception e) {
        log.error("Client initialized sending message exception===>{}", e.getMessage(), e);
    }
}

When the netty client instance is created, the thread pool is used to perform initialization. Since it is executed asynchronously, it is likely that the client has not completed the connection by sending a message at this time, so it must be locked and waited. Enter the synchronized code block, use the wait method to wait for the client to be activated before unlocking, the parameter 5000 is the number of milliseconds for automatic unlocking, which means that if the client fails to connect successfully and activate the channel, unlock if there is an abnormal situation , the lock will be unlocked automatically after a maximum of 5000 milliseconds.

This parameter can be adjusted according to the actual situation. In the case of a large amount of concurrency, 5 seconds of blocking may cause the thread pool to be exhausted or memory overflow. After the client is successfully created and activated, the message will be sent immediately.

3. Client

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok. Getter;
import lombok. NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {

    @Value("${netty.server.port}")
    private int port;

    @Value("${netty.server.host}")
    private String host;
    // client unique ID
    private String imei;
    //custom business data
    private Map<String, Object> bizData;

    private EventLoopGroup workGroup;

    private Class<BaseClientHandler> clientHandlerClass;

    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }

    @Override
    public void run() {
        try {
            this.init();
            log.info("Client start imei={}", imei);
        } catch (Exception e) {
            log.error("Client failed to start: {}", e.getMessage(), e);
        }
    }

    public void close() {
        if (null != this. channelFuture) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }

    public void send(String message) {
        try {
            if (!this. channelFuture. channel(). isActive()) {
                log.info("The channel is not active imei={}", this.imei);
                return;
            }
            if (!StringUtils. isEmpty(message)) {
                log.info("queue message sending===>{}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log. error(e. getMessage(), e);
        }
    }

    private void init() throws Exception {
        //Pass this instance to the handler
        BaseClientHandler clientHandler = SpringUtils. getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        //2 Construct server/client through auxiliary class
        b. group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\r\\
".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));// String decoding.
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));// String decoding.
// // heartbeat settings
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    }
                });
        this. connect(b);
    }

    private void connect(Bootstrap b) throws InterruptedException {
        long c1 = System. currentTimeMillis();
        final int maxRetries = 2; //Reconnect 2 times
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(
                    new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future. isSuccess()) {
                                if (count. incrementAndGet() > maxRetries) {
                                    log.warn("imei={}reconnected more than {} times", imei, maxRetries);
                                } else {
                                    log.info("imei={}reconnect {} times", imei, count);
                                    b. connect(host, port). addListener(this);
                                }

                            } else {
                                log.info("imei={} connection is successful, connection IP: {} connection port: {}", imei, host, port);
                                flag. set(true);
                            }
                        }
                    }).sync(); //synchronous connection
        } catch (Exception e) {
            log. error(e. getMessage(), e);
        }
        log.info("device imei={}, channelId={} connection time={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
        if (flag. get()) {
            channelFuture.channel().closeFuture().sync(); //The thread will continue to be blocked after the connection is successful
        }
    }
}

The netty client is multi-instance, and each instance is bound to a thread, which continues to block until the client is closed. Each client can save its own business data, so as to process business use when interacting with the server later. When the client executes the connection, it gives 2 chances to retry, and if the connection fails for 3 times, it will give up. Later, you can choose to re-enqueue the message for consumption. In our actual project, a login message should be sent to the server in advance, and subsequent communication can only be performed after the server confirms, which needs to be adjusted according to the actual situation.

Another point to note is that EventLoopGroup is passed in from the constructor, not created in the client, because when the number of clients is very large, each client creates its own thread group It will greatly consume server resources, so in actual use, we create a unified thread group according to the business for all clients under the business to use together. The size of the thread group needs to be flexibly configured according to business needs.

In the init method, we add a handler to the client to handle the interaction with the server. Let’s take a look at the specific implementation.

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author ReWind00
 * @date 2023/2/15 10:09
 */
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {

    private final String imei;

    private final Map<String, Object> bizData;

    private final NettyClient nettyClient;

    private int allIdleCounter = 0;

    private static final int MAX_IDLE_TIMES = 3;

    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client imei={}, the channel is activated successfully", this.imei);
        synchronized (this.nettyClient) { //Unlock the queue thread when the channel is activated, and then send the message
            this.nettyClient.notify();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("client imei={}, the channel is disconnected", this.imei);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client imei={}, received message: {}", this.imei, msg);
        //Process business...
        if ("shutdown".equals(msg)) {
            this.nettyClient.close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            boolean flag = false;
            if (e. state() == IdleState. ALL_IDLE) {
                this. allIdleCounter++;
                log.info("client imei={} triggers idle read or write {} times", this.imei, this.allIdleCounter);
                if (this. allIdleCounter >= MAX_IDLE_TIMES) {
                    flag = true;
                }
            }
            if (flag) {
                log.warn("Read and write timeout reaches {} times, actively disconnect", MAX_IDLE_TIMES);
                ctx.channel().close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("client imei={}, connection exception {}", imei, cause.getMessage(), cause);
    }
}

DemoClientHandler is also a multi-instance bean, and each instance holds its own NettyClient reference for subsequent processing of specific services. In the channelActive method, we can see that the notify method of the client instance is executed. This is where the wait lock is released after the client is successfully created and the channel is activated. The channelRead method is how we process the messages sent by the server. Our specific business should be executed in this method. Of course, it is not recommended to block the working thread of the client for a long time, and asynchronous processing can be considered.

Finally we look at the client cache class.

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author ReWind00
 * @date 2023/2/15 11:01
 */
public class NettyClientHolder {

    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();

    public static ConcurrentHashMap<String, NettyClient> get() {
        return clientMap;
    }

}

Since the channel of netty cannot be serialized, it cannot be stored in redis and can only be cached in local memory, which is essentially a ConcurrentHashMap.

5. Test

package org.example.client.controller;

import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author ReWind00
 * @date 2023/2/15 13:48
 */
@RestController
@RequestMapping("/demo")
public class DemoController {

    /**
     * Send two messages at intervals
     */
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try {
            Thread. sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }

    /**
     * Send any message
     *
     * @param imei
     * @param msg
     */
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }

    /**
     * Send two messages in a row The second message will be put back into the queue to delay consumption due to the redis lock
     */
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

The test interface code is as above, call testOne, the log is as follows:

ca0990d09926e804969a732f55996454.png

It can be seen that the first message triggers the client creation process, and the message is sent after creation, while the second message after 5 seconds is sent directly through the existing channel.

The test interface code is as above, call testTwo, the log is as follows:

4b5aa161360fb0f6c7bc564a0d37e5fb.png

Sending shutdown can actively disconnect the existing connection.

The test interface code is as above, call testThree, the log is as follows:

3cafd340c40a92ef04dfea50f7d0b569.png

You can see that the second message was re-queued and consumed late.

6. Source code

  • https://gitee.com/jaster/netty-tcp-demo

Postscript

This demo project is only for learning and communication. If it is to be applied to the production environment, there are still some shortcomings. Students who have questions can leave a message to communicate.

Recommended reading:

I was stunned by Wen Xin’s magical drawing…

Alibaba VP Jia Yangqing confirms his resignation! Haven’t founded a company, direction or AI architecture yet?

Internet junior high school senior big factory interview questions (9 G)
The content includes Java basics, JavaWeb, MySQL performance optimization, JVM, locks, millions of concurrency, message queues, high-performance caching, reflection, Spring family bucket principles, microservices, Zookeeper... and other technology stacks!
?Click to read the original text to receive! I have read

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge Network skill treeHome pageOverview 28679 people are studying systematically