Netty practice — Netty handles sticky packet unpacking

TCP packet sticking/unpacking

TCP processes data in a streaming manner.

Unpacking: A complete data packet may be split into multiple packets by TCP for sending.

Sticky packets: TCP may stick multiple small packets into one large data packet.

Introduce dependency packages:

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.29.Final</version>
        </dependency>

Example of sticky package unpacking:

  • Server EchoServer:
/**
 * After receiving the client's message, the server will respond.
 */
public final class EchoServer {
    /**
     * port
     */
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        //Configure EventLoopGroup
        // Master-slave Reactor multi-thread mode, bossGroup is the master Reactor, workerGroup is the slave Reactor
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Initialize the server's boot class ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //Specify EventLoopGroup
            serverBootstrap.group(bossGroup, workerGroup)
                //Specify channel
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
                //Specify ChannelHandler for processing Channel
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     //ChannelPipeline, based on the responsibility chain model, can add multiple ChannelHandlers
                     ChannelPipeline channelPipeline = ch.pipeline();
                     //ChannelHandler, used to process channels, process received data, and implement business logic.
                     //Fixed length unpacker FixedLengthFrameDecoder
// channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                     channelPipeline.addLast(new EchoServerHandler());
                 }
             });

            // Start the server and bind the server to the port on which it will listen for connection requests.
            ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();

            // Wait until the server socket is closed
            channelFuture.channel().closeFuture().sync();
        } finally {
            //Close all eventLoops and terminate threads
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • Server-side EchoServerHandler:
/**
 * ChannelHandler on the server side.
 *
 * ChannelHandler, used to process channels, process received data, and implement business logic.
 * Inherit ChannelInboundHandlerAdapter to define methods for responding to inbound events.
 *
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * channelRead(): Read the message passed in by channel
     *
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf= (ByteBuf) msg;
        log.info("Message from client: " + buf.toString(CharsetUtil.UTF_8) + "\
");
    }

    /**
     * channelReadComplete(): Indicates that the current ChannelHandler has finished reading.
     * After execution, it will automatically jump to the next ChannelHandler in ChannelPipeline.
     *
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //Return data to the client, writeAndFlush() can also be split into write(msg) and flush()
        ctx.writeAndFlush(Unpooled.copiedBuffer("I'm happy to see you too^_^",CharsetUtil.UTF_8));
    }

    /**
     * exceptionCaught(): Called when an exception is thrown during the read operation.
     *
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //Close the connection when an exception occurs
        cause.printStackTrace();
        ctx.close();
    }
}

  • Client EchoClient
/**
 * client. Send messages to the server and receive responses from the server.
 *
 */
public final class EchoClient {

    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel socketChannel) throws Exception {
                     ChannelPipeline channelPipeline = socketChannel.pipeline();
                     //ChannelHandler, used to process channels, process received data, and implement business logic.
                     //Fixed length unpacker FixedLengthFrameDecoder
// channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                     channelPipeline.addLast(new ClientNoDecoderHandler());
                 }
             });

            //Open the client and connect to the port of the server
            ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();

            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • Client ClientNoDecoderHandler:
public class ClientNoDecoderHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 1000; i + + ) {
            ByteBuf buffer = getByteBuf(ctx);
            ctx.channel().writeAndFlush(buffer);
        }
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "sticky packet unpacking test.".getBytes(StandardCharsets.UTF_8);
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }
}

Test results:

The client sends multiple bytes, and the server may experience sticky unpacking when receiving.

For example, the following client sent multiple “sticky packet unpacking tests.” As a result, several pieces of data were stuck together, and some were split again.

21:32:18.793 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - Message from the client: Sticky packet unpacking test. Sticky packet unpacking test.

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - Message sent from client: Sticky packet unpacking test. Sticky packet unpacking test. Sticky packet unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test.?

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - Message sent from client:? Packet unpacking test. Sticky packet unpacking test. Sticky packet unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Unpacking test. Sticky bag unpacking test. Sticky bag unpacking?

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - Message sent from client:? Test. Sticky packet unpacking test. Sticky packet unpacking test. Sticky packet unpacking test. Sticky packet unpacking test Bag test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test. Sticky bag unpacking test .Adhesive package unpacking test.Adhesive package unpacking test.Adhesive package unpacking test.Adhesive package unpacking test.Adhesive package unpacking test.

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - Message sent from client: Sticky packet unpacking test. Sticky packet unpacking test. Sticky packet unpacking test.

Netty solves sticky packet unpacking

Netty can handle sticky packet unpacking using multiple unpackers.

  • FixedLengthFrameDecoder:

Fixed-length depacketizer, Netty decodes fixed-length stream data in scenarios where the message length is fixed.

  • LineBasedFrameDecoder:

Line unpacker, when the sender sends data packets, each data packet is separated by a newline character (i.e. \
or \r\
).

  • DelimiterBasedFrameDecoder:

Delimiter unpacker. DelimiterBasedFrameDecoder glues and unpacks data using user-specified delimiters.

  • LengthFieldBasedFrameDecoder:

Length field based unpacker. The last type of unpacker is the most versatile type of unpacker. As long as your custom protocol contains a length field field, you can use this unpacker to implement application layer unpacking.

Test:

Use FixedLengthFrameDecoder for unpacking.

Release the following code that has been commented out for the EchoClient and EchoServer classes in the example.

channelPipeline.addLast(new FixedLengthFrameDecoder(19));

It can be seen that the data flow has been displayed normally, and there is no sticking or unpacking.

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - Message from client: Sticky packet unpacking test.

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - Message from client: Sticky packet unpacking test.

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - Message from client: Sticky packet unpacking test.

Reference materials:

https://zhuanlan.zhihu.com/p/415450910