[Network programming] LineEncoder and DelimiterBasedFrameDecoder fixed collocation of codec in Netty

Author: Cheng Jiusheng
Link: https://www.jianshu.com/p/025c62891ab4
Source: Jianshu
Copyright belongs to the author. For commercial reprint, please contact the author for authorization, for non-commercial reprint, please indicate the source.

How to add special symbols at the end of the sentence to solve the problem of sticking/unpacking?

There are two more classes in the netty class library, the LineEncoder encoding class and the DelimiterBasedFrameDecoder decoding class.

Open the source code of DelimiterBasedFrameDecoder, which inherits from ByteToMessageDecoder and is responsible for splitting data packets according to the special symbols at the end of the data packets.

In the example on line 56, a newline is added as a delimiter for each packet.

In this class, 7 member variables are provided:

  1. delimiters: one or more delimiters of ByteBuf array type;

  2. maxFrameLength: the maximum length of the data packet;

  3. stripDelimiter: whether to skip the delimiter;

  4. failFast: When true, an error will be reported when the entire data packet is not read, otherwise an error will be reported after reading;

  5. discardingTooLongFrame: Whether to discard too long data;

  6. tooLongFrameLength: the maximum length of the data packet;

  7. LineBasedFrameDecoder: newline delimiter decoder, only newline can be set as a delimiter.

Among the 7 parameters, 5 members can be set separately through the 6 constructors of this class, and finally the assignment of member variables is completed in the constructor of line 166.

In this constructor, the maximum length of the data packet is judged on line 168, and the maximum length cannot be set to a number less than 0. On line 169, the delimiter cannot be set to null. On line 172, the delimiter array cannot be an empty array. In line 176, determine whether the separator is a newline character, and if it is a newline character, use the LineBasedFrameDecoder newline character decoding class to decode. In line 182, traverse the separator array, and in line 183, verify that the ByteBuf where the separator is located cannot be null and must be readable. Add the delimiter to the member variable delimiters array on line 184.

The isLineBased method is a validation of whether the delimiter is a newline.

The core logic of decoding is in the decode method.

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    if (lineBasedDecoder != null) {
        return lineBasedDecoder.decode(ctx, buffer);
    }
    // Try all delimiters and choose the delimiter which yields the shortest frame.
    int minFrameLength = Integer.MAX_VALUE;
    ByteBuf minDelim = null;
    for (ByteBuf delim: delimiters) {
        int frameLength = indexOf(buffer, delim);
        if (frameLength >= 0 & amp; & amp; frameLength < minFrameLength) {
            minFrameLength = frameLength;
            minDelim = delim;
        }
    }

    if (minDelim != null) {
        int minDelimLength = minDelim. capacity();
        ByteBuf frame;

        if (discardingTooLongFrame) {
            // We've just finished discarding a very large frame.
            // Go back to the initial state.
            discardingTooLongFrame = false;
            buffer.skipBytes(minFrameLength + minDelimLength);

            int tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            if (!failFast) {
                fail(tooLongFrameLength);
            }
            return null;
        }

        if (minFrameLength > maxFrameLength) {
            // Discard read frame.
            buffer.skipBytes(minFrameLength + minDelimLength);
            fail(minFrameLength);
            return null;
        }

        if (stripDelimiter) {
            frame = buffer. readRetainedSlice(minFrameLength);
            buffer.skipBytes(minDelimLength);
        } else {
            frame = buffer. readRetainedSlice(minFrameLength + minDelimLength);
        }

        return frame;
    } else {
        if (!discardingTooLongFrame) {
            if (buffer. readableBytes() > maxFrameLength) {
                // Discard the content of the buffer until a delimiter is found.
                tooLongFrameLength = buffer. readableBytes();
                buffer.skipBytes(buffer.readableBytes());
                discardingTooLongFrame = true;
                if (failFast) {
                    fail(tooLongFrameLength);
                }
            }
        } else {
            // Still discarding the buffer since a delimiter is not found.
            tooLongFrameLength += buffer. readableBytes();
            buffer.skipBytes(buffer.readableBytes());
        }
        return null;
    }
}

/**
 * Get the packet length by special symbols
 */
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
    for (int i = haystack. readerIndex(); i < haystack. writerIndex(); i ++ ) {
        int haystackIndex = i;
        int needleIndex;
        for (needleIndex = 0; needleIndex < needle. capacity(); needleIndex ++ ) {
            if (haystack. getByte(haystackIndex) != needle. getByte(needleIndex)) {
                break;
            } else {
                haystackIndex++;
                if (haystackIndex == haystack. writerIndex() & amp; & amp;
                    needleIndex != needle. capacity() - 1) {
                    return -1;
                }
            }
        }

        if (needleIndex == needle. capacity()) {
            // Found the needle from the haystack!
            return i - haystack. readerIndex();
        }
    }
    return -1;
}

In 2 lines of code, judge whether the member variable lineBasedDecoder is null, and if not, use this class to decode the data packet.

On line 8, the array of delimiters is iterated over. In line 9, the index of the buffer where the delimiter is located is obtained through the indexOf method. On line 10, if the packet size is greater than 0 and less than the minimum packet length. In lines 11-12, set the minimum packet length equal to the current packet length, and set the minimum delimiter equal to the current delimiter.

In line 16, when the minimum separator is not null, get the space occupied by the ByteBuf where the minimum separator is located. On line 20, if discardingTooLongFrame is true, the buffer discards minFrameLength + minDelimLength bytes.

In line 34, when the minimum packet length minFrameLength is greater than the maximum packet length maxFrameLength, the buffer will discard minFrameLength + minDelimLength bytes.

In line 41, if stripDelimiter is true, a new ByteBuf of the buffer sub-area is obtained from the buffer according to the minimum packet length minFrameLength in line 42, and bytes of the special character length minDelimLength in the buffer are skipped in line 43. The default value of stripDelimiter is true, that is, this logic will be followed when it is not set.

When stripDelimiter is false, get a new frame of the buffer sub-area from the buffer according to the minimum packet length plus the minimum delimiter length minFrameLength + minDelimLength in line 45 and return it. The minimum packet length has been set in line 6, which is the maximum value of the Integer type.

In a word, the delimiter of the minimum index in the packet is obtained on line 9, and the length of the minimum index delimiter. In line 42, a new ByteBuf is separated according to the delimiter of the smallest index, and the index is skipped, a new ByteBuf is returned, and the next round of segmentation is performed until the data in the buffer is processed.

The important source code has been read, and then look at the server-side code:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * Netty server side
 * @author Cheng on life
 * @date 06 January 2023
 * @Description
 *
 */
public class TestServer {

    public void bind(final int port){
        // Configure the Nio thread group on the server side, boosGroup is responsible for new client access
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        // workerGroup is responsible for I/O message processing
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap. group(boosGroup, workerGroup)
            // Thread group is set to non-blocking
            .channel(NioServerSocketChannel.class)
            //The size of the connection buffer pool
          .option(ChannelOption.SO_BACKLOG, 1024)
          //Set the allocator of the channel Channel
          .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //Set up long connection
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            // Use anonymous inner class to declare hanlder
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  // 3.1, use special symbols as the end of a package
                    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                  // ByteBuf to string
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  
                  // String to ByteBuf
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                  // 3.2, every time the data sent out, add special symbols
                  ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8));
                    // event handler binding
                    ch.pipeline().addLast(new ServerHandler());
                }
            });
            // bind port
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // The server starts listening for events
        channelFuture. addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future<? super Void> future) throws Exception {
                  //Process after successful startup
                    if (future. isSuccess()) {
                       System.out.println("The server started successfully, Started Successful:" + port);
                    } else {
                      System.out.println("The server failed to start, Started Failed:" + port);
                    }
                }
            });
        // Wait for the server listening port to close
        channelFuture. channel(). closeFuture(). sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // exit gracefully
            boosGroup. shutdown Gracefully();
            workerGroup. shutdown Gracefully();
        }
    }
    
    public static void main(String[] argo){
        new TestServer().bind(8080);
    }
}

/**
 * server-side handler
 * @author Cheng on life
 * @date 06 January 2023
 * @Description
 *
 */
class ServerHandler extends ChannelInboundHandlerAdapter{
    // count the received messages
    private static int counter;
    // Receive and process I/O messages
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        try {
            // Output the received content to the console
            System.out.println("Here is the server console:" + msg + ", count:" + + + counter);
            // 1.2, send string
            String resp = "Message from the server~!";
            // Return information to the client
            ctx.writeAndFlush(resp);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       // Close the ChannelHandlerContext when an exception is encountered
       ctx. close();
    }
}

In lines 54-55, the DelimiterBasedFrameDecoder decoder is added, and $_ is set as the decoding delimiter for each packet.

In line 63, the LineEncoder encoder is added. Through this encoding class, a special delimiter $_ can be added at the end of each packet to be sent out, so that it can be controlled globally and the delimiter can be changed at any time. Manually added after the sent data packet.

Client code:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * netty client
 * @author Cheng on life
 * @date 06 January 2023
 * @Description
 *
 */
public class TestClient {
  
    public void connect(int port, String host){
        // Client Nio thread group
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap. group(group)
            // Thread group is set to non-blocking
            .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  // 3.1, use special symbols as the end of a package
                    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                  // ByteBuf to string
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  
                  // String to ByteBuf
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                  // 3.2, every time the data sent out, add special symbols
                  ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8));
                    // event handler binding
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            // establish connection
            ChannelFuture channelFuture = bootstrap. connect(host, port);
            // Wait for the server listening port to close
            channelFuture. channel(). closeFuture(). sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // exit gracefully
            group. shutdown Gracefully();
        }
    }
    
    public static void main(String[] argo){
        new TestClient().connect(8080, "localhost");
    }
}

/**
 * Client processing handler
 * @author Cheng on life
 * @date 06 January 2023
 * @Description
 *
 */
class ClientHandler extends ChannelInboundHandlerAdapter{
    // count the number of received messages
    private static int counter;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
      // 1.2, send string
        String req = "Message from client~!";
        // After the connection is successful, send a message and send it 100 times in a row, simulating frequent data interaction
        for(int i = 0;i<100;i ++ ){
            ctx.writeAndFlush(req);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        try {
            System.out.println("Here is the client console:" + msg + ", count:" + + + counter);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //Release resources
        ctx. close();
    }
}

The encoding and decoding in lines 41-52 need to be consistent with the server. Next run the server-side and client-side code separately.

Server side console output:

Client console output:

Both the server-side console and the client-side console have printed data 100 times, and there is no sticking/unpacking problem.

What happens if there are two newlines?

The first part of the server-side encoding adjustment: when decoding, add a special separator @@.

ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ByteBuf delimiter1 = Unpooled.copiedBuffer("@@".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter, delimiter1));

The second part of the server-side adjustment: when responding to the client, add special characters and add text. Since the encoder LineEncoder only supports one set of characters, we can only add a second set of delimiters to the sent text.

// 1.2, send string
String resp = "Message from the server~!@@Message from the server~!";

Client modifications are consistent with the server.

As a result of the server-side operation, the client sent 100 messages, but there are two separators, and each message is split into two data packets, so the final count is 200, no problem.

Client running results: The server received 200 pieces of data, responded to the client with 200 pieces of data, and responded to the client with two special characters in one piece of data, so the number of data received by the client is 400, and there is no problem.

The above is the fixed collocation of LineEncoder encoder and DelimiterBasedFrameDecoder decoder.

Author: Cheng Jiusheng
Link: https://www.jianshu.com/p/025c62891ab4
Source: Jianshu
Copyright belongs to the author. For commercial reprint, please contact the author for authorization, for non-commercial reprint, please indicate the source.