From shallow to deep Netty protocol design and analysis

Table of Contents

  • 1 Why do you need an agreement?
  • 2 Example of redis protocol
  • 3 Example of http protocol
  • 4 Custom Protocol Elements
    • 4.1 Codecs
    • 4.2 When can add @Sharable

1 Why do you need a protocol?

Message transmission in TCP/IP is stream-based and has no boundaries.

The purpose of the agreement is to delineate the boundary of the message and formulate the communication rules that both parties must abide by

For example: transmission over the network

Leave guests on rainy days and not me on rainy days

It is a famous Chinese sentence without punctuation marks. Without punctuation marks, this sentence has several dismantling methods, but the meaning is completely different, so it is often used to explain the importance of punctuation marks

an interpretation

Leave a guest on a rainy day, stay on a rainy day, but I will not stay

another interpretation

Rainy day, stay away, will you stay with me? Keep

How to design a protocol? In fact, it is to add “punctuation marks” to the information transmitted by the network. But it is not very good to segment by delimiter, because the delimiter itself must be distinguished if it is used for transmission. Therefore, the following protocol is more commonly used

Fixed-length bytes indicate content length + actual content

For example, assuming that a Chinese character has a length of 3, according to the rules of the above protocol, the method of sending information is as follows, so that the meaning will not be misunderstood by the receiver

0f stay in rainy days 06 stay in 09 I do not stay

short story

A long time ago, a private school teacher came to teach in a family. The two parties signed an agreement: “No chicken, duck, no fish, no cabbage, tofu, no less gold”. Since then, although the private school teacher has been serious about teaching, the host always gives him cabbage and tofu as a dish, and there is no hospitality of chicken, duck and fish at all. Mr. Private School was very puzzled at first, but later he figured it out: the owner will exchange the money for chicken, duck and fish meat with Shuxiu gold, no matter what. So far, the two sides live in peace.

The end of the year is approaching, and a school year is also coming to an end. When Mr. Private School was leaving, he didn’t see his master handing over Shu Xiujin for him, so he argued with the master. However, the host family also has plausible words: “There is an agreement to prove it-no chicken, duck, fish, cabbage and tofu are indispensable, and repair gold is not allowed. This is clearly written in black and white. What do you want to say?”

Mr. Private School argued hard: “The agreement is like this-duck is acceptable without chicken; meat is acceptable without fish; cabbage and tofu are not allowed, and Xiujin is a must.”

The two sides are fighting with each other, you come and go, it’s really a joy!

Shuxiu gold here, also known as “shuxiu”, should generally refer to the remuneration teachers should get

2 Example of redis protocol

NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {<!-- -->13, 10};
try {<!-- -->
    Bootstrap bootstrap = new Bootstrap();
    bootstrap. channel(NioSocketChannel. class);
    bootstrap. group(worker);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- -->
        @Override
        protected void initChannel(SocketChannel ch) {<!-- -->
            ch.pipeline().addLast(new LoggingHandler());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {<!-- -->
                // After the connection channel is successfully established, the active event will be triggered
                @Override
                public void channelActive(ChannelHandlerContext ctx) {<!-- -->
                    set(ctx);
                    get(ctx);
                }
                private void get(ChannelHandlerContext ctx) {<!-- -->
                    ByteBuf buf = ctx.alloc().buffer();
                    buf.writeBytes("*2".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("get".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("aaa".getBytes());
                    buf.writeBytes(LINE);
                    ctx.writeAndFlush(buf);
                }
                private void set(ChannelHandlerContext ctx) {<!-- -->
                    ByteBuf buf = ctx.alloc().buffer();
                    buf.writeBytes("*3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("set".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("aaa".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("bbb".getBytes());
                    buf.writeBytes(LINE);
                    ctx.writeAndFlush(buf);
                }

                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- -->
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println(buf.toString(Charset.defaultCharset()));
                }
            });
        }
    });
    ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
    channelFuture. channel(). closeFuture(). sync();
} catch (InterruptedException e) {<!-- -->
    log. error("client error", e);
} finally {<!-- -->
    worker. shutdown Gracefully();
}

3 http protocol example

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {<!-- -->
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap. channel(NioServerSocketChannel. class);
    serverBootstrap.group(boss, worker);
    serverBootstrap. childHandler(new ChannelInitializer<SocketChannel>() {<!-- -->
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(new HttpServerCodec());
            ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {<!-- -->
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {<!-- -->
                    // get request
                    log.debug(msg.uri());

                    // return the response
                    DefaultFullHttpResponse response =
                            new DefaultFullHttpResponse(msg. protocolVersion(), HttpResponseStatus. OK);

                    byte[] bytes = "<h1>Hello, world!</h1>".getBytes();

                    response. headers(). setInt(CONTENT_LENGTH, bytes. length);
                    response. content(). writeBytes(bytes);

                    // write back the response
                    ctx.writeAndFlush(response);
                }
            });
            /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.debug("{}", msg.getClass());

                    if (msg instanceof HttpRequest) { // request line, request header

                    } else if (msg instanceof HttpContent) { //Request body

                    }
                }
            });*/
        }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
    channelFuture. channel(). closeFuture(). sync();
} catch (InterruptedException e) {<!-- -->
    log.error("server error", e);
} finally {<!-- -->
    boss. shutdown Gracefully();
    worker. shutdown Gracefully();
}

4 Custom protocol elements

  • Magic number, used to determine whether it is an invalid packet at the first time
  • Version number, can support protocol upgrade
  • The serialization algorithm, which serialization and deserialization method is used for the message body, can be extended, for example: json, protobuf, hessian, jdk
  • The command type is login, registration, single chat, group chat… related to business
  • Request sequence number, providing asynchronous capability for duplex communication
  • text length
  • message body

4.1 codec

According to the above elements, design a login request message and login response message, and use Netty to complete sending and receiving

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {<!-- -->

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {<!-- -->
        // 1. 4 byte magic number
        out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
        // 2. 1 byte version,
        out. writeByte(1);
        // 3. 1-byte serialization method jdk 0 , json 1
        out. writeByte(0);
        // 4. 1 byte instruction type
        out.writeByte(msg.getMessageType());
        // 5. 4 bytes
        out.writeInt(msg.getSequenceId());
        // meaningless, alignment padding
        out.writeByte(0xff);
        // 6. Get the byte array of the content
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. Length
        out.writeInt(bytes.length);
        // 8. Write content
        out. writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- -->
        int magicNum = in. readInt();
        byte version = in. readByte();
        byte serializerType = in. readByte();
        byte messageType = in. readByte();
        int sequenceId = in. readInt();
        in. readByte();
        int length = in. readInt();
        byte[] bytes = new byte[length];
        in. readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois. readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out. add(message);
    }
}

test

EmbeddedChannel channel = new EmbeddedChannel(
    new LoggingHandler(),
    new LengthFieldBasedFrameDecoder(
        1024, 12, 4, 0, 0),
    new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "Zhangsan");
// channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);

ByteBuf s1 = buf. slice(0, 100);
ByteBuf s2 = buf. slice(100, buf. readableBytes() - 100);
s1.retain(); // reference count 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);

interpret

4.2 When can @Sharable be added

  • When the handler does not save state, it can be safely shared by multiple threads
  • But note that for codec classes, you cannot inherit ByteToMessageCodec or CombinedChannelDuplexHandler parent classes, and their construction methods have restrictions on @Sharable
  • If you can ensure that the codec will not save state, you can inherit the MessageToMessageCodec parent class
@Slf4j
@ChannelHandler. Sharable
/**
 * Must be used together with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {<!-- -->
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {<!-- -->
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 byte magic number
        out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
        // 2. 1 byte version,
        out. writeByte(1);
        // 3. The serialization method of 1 byte jdk 0 , json 1
        out. writeByte(0);
        // 4. 1 byte instruction type
        out.writeByte(msg.getMessageType());
        // 5. 4 bytes
        out.writeInt(msg.getSequenceId());
        // meaningless, alignment padding
        out.writeByte(0xff);
        // 6. Get the byte array of the content
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. Length
        out.writeInt(bytes.length);
        // 8. Write content
        out. writeBytes(bytes);
        outList. add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- -->
        int magicNum = in. readInt();
        byte version = in. readByte();
        byte serializerType = in. readByte();
        byte messageType = in. readByte();
        int sequenceId = in. readInt();
        in. readByte();
        int length = in. readInt();
        byte[] bytes = new byte[length];
        in. readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois. readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out. add(message);
    }
}