Using Netty for protocol development: multi-protocol support and implementation of custom protocols

Building network applications using Netty: multi-protocol support and implementation of custom protocols

  • Why is an agreement needed?
  • Netty supports a rich set of protocols, allowing programmers to focus on business
  • Custom protocol
    • 1. Preparation
    • 2. Introduction to codec abstract class
    • 3. Codec implementation

Why do we need an agreement?

In TCP/IP, data transmission is carried out by streaming, which means that the data is divided into a series of packets and transmitted over the network. Although such a streaming method is efficient, it does not have clear message boundaries, so a mechanism is needed at the receiving end to determine the start and end positions of the message. The purpose of the agreement is to delineate the boundaries of the message and formulate communication rules that both communicating parties must abide by.

The function of the protocol is to define the rules and agreements between the communicating parties, including message format, transmission method, error handling, etc. Through the protocol, the sender can package the data into a message in a certain format and include necessary metadata in the message, such as message length and other information. The receiver parses the message according to the format and rules specified by the protocol, thereby correctly restoring the data sent by the sender.

The protocol can also define a data verification mechanism to ensure data integrity and reliability. For example, in the TCP protocol, each data packet has a checksum field through which the receiver can verify whether the data has been damaged or tampered with during transmission. If the packet is corrupted, the receiver can ask the sender to resend it.

When it rains, you stay with me when it rains, and when it rains, you stay with me. is a famous Chinese sentence without punctuation marks. Without punctuation marks, this sentence can be deconstructed in several ways, but the meaning is completely different. Therefore, it is often used to describe the importance of punctuation marks.

  • One interpretation is, “On a rainy day, guests will be invited to stay, but the weather will stay, but I won’t stay.”
  • Another interpretation, it’s a rainy day, a guest’s day, will you stay with me? Keep

How to design a protocol? In fact, it is to add “punctuation marks” to the information transmitted over the network. But it is not good to segment sentences by delimiters, because the delimiters themselves must be distinguished if they are used for transmission. Therefore, the following protocol is more commonly used

Fixed-length bytes represent content length + actual content

For example, a Chinese character is two bytes. According to the rules of the above protocol, the message is sent as follows, and the meaning will not be misunderstood by the receiver.

06 Rainy day 06 Guest day 06 Stay, I won’t 02 Stay

Netty supports a rich set of protocols, allowing programmers to focus on business

In Netty, many protocol implementation details have been encapsulated, which allows developers to focus on business logic without having to deal with the implementation of the protocol themselves. The following is a sample code based on HTTP protocol communication, which demonstrates how to simplify protocol processing – printing the content of the http request and returning the response

@Slf4j
public class TestHttp {<!-- -->
    public static void main(String[] args) {<!-- -->
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        try {<!-- -->
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.group(boss, worker);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {<!-- -->
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    // Http server codec
                    ch.pipeline().addLast(new HttpServerCodec());
                    // Processing can be distinguished based on message type
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {<!-- -->
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {<!-- -->
                            // read request
                            log.debug("Message received: {}", msg.uri());

                            //return response
                            DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                            byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
                            response.content().writeBytes(bytes);
                            response.headers().setInt(CONTENT_LENGTH, bytes.length);

                            //Write back response
                            ctx.writeAndFlush(response);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {<!-- -->
            e.printStackTrace();
        } finally {<!-- -->
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

Use the browser to send a request to the server:


Custom protocol

If you want to customize a protocol, you can easily implement it in the Netty framework. Netty provides powerful APIs and tools to easily build custom protocol communications.

1. Preparation

When it comes to message transmission and encoding and decoding, in order to achieve better scalability and interoperability, it is helpful to define a common message body. The public message body is a data structure shared between communicating parties and is used for message transmission and parsing on the network.

abstract message body

package com.gw.core.message;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
 * Description: Abstract message body
 *
 * @author YanAn
 * @date 2023/8/7 20:37
 */
@Data
public abstract class Message implements Serializable {<!-- -->

    /**
     * According to the message type byte, obtain the corresponding message class
     * @param messageType message type byte
     * @return message class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {<!-- -->
        return messageClasses.get(messageType);
    }

    private int sequenceId;

    private int messageType;

    public abstract int getMessageType();
    /**
     * Request type byte value
     */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 0;
    /**
     * Response type byte value
     */
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 1;

    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    static {<!-- -->
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}

Rpc request message body

package com.gw.core.message;

import lombok.Getter;
import lombok.ToString;

/**
 * Description: Rpc request body
 *
 * @author LinHuiBa-YanAn
 * @date 2023/8/7 20:39
 */
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {<!-- -->

    /**
     * The fully qualified name of the called interface, the server finds the implementation based on it
     */
    private String interfaceName;
    /**
     * Call the method name in the interface
     */
    private String methodName;
    /**
     * Method return type
     */
    private Class<?> returnType;
    /**
     *Method parameter type array
     */
    private Class[] parameterTypes;
    /**
     * Array of method parameter values
     */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {<!-- -->
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {<!-- -->
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

Rpc response message body

package com.gw.core.message;

import lombok.Data;
import lombok.ToString;

/**
 * Description: Rpc response body
 *
 * @author YanAn
 * @date 2023/8/7 20:43
 */
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {<!-- -->
    /**
     * return value
     */
    private Object returnValue;
    /**
     * outliers
     */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {<!-- -->
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

2. Introduction to codec abstract class

MessageToMessageCodec is a codec abstract class provided in the Netty framework, which is used to convert encoding and decoding when processing messages in network communications. Can be seen as a combination of MessageToMessageDecoder and MessageToMessageEncoder. MessageToMessageCodec is actually a subclass of ChannelDuplexHandler, which implements both the ChannelInboundHandler and ChannelOutboundHandler interfaces. This allows it to be used both during the decoding of inbound messages (messages received from external systems) and during the encoding of outbound messages (messages sent to external systems).

3. Codec implementation

Let’s first think about what elements are needed for a custom protocol?

  • Magic number, used to determine whether an invalid data packet is invalid at the first time
  • Version number, which can support protocol upgrades
  • Serialization algorithm, which serialization and deserialization method is used for the message body, can be extended from this, for example: json, protobuf, hessian, jdk…
  • Instruction type, login, registration, single chat, group chat, etc. (business related)
  • Request sequence number, providing asynchronous capabilities for two-way communication
  • Text length
  • Message text

Based on the above elements, design a login request message and login response message, and use Netty to complete the sending and receiving

package com.gw.core.protocol;

import com.gw.core.config.Config;
import com.gw.core.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * Description: Message codec
 *
 * @author LinHuiBa-YanAn
 * @date 2023/8/7 20:49
 */
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {<!-- -->
    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {<!-- -->
        ByteBuf out = ctx.alloc().buffer();
       // 1. 4-byte magic number
      out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
      // 2. 1 byte version
        out.writeByte(1);
      // 3. 1 byte serialization method jdk 0, json 1
        out.writeByte(1);
      // 4. 1-byte message instruction type
        out.writeByte(msg.getMessageType());
      // 5. 4-byte request sequence number
        out.writeInt(msg.getSequenceId());
      //Padding bits (11 bytes in total, plus one byte as padding bits)
        out.writeByte(0xff);
      // Get the byte array of content
        byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
      \t// length
        out.writeInt(bytes.length);
      // News essay collection
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- -->
      // read magic
        int magicNum = in.readInt();
      // read version
        byte version = in.readByte();
      //Read serialization method
        byte serializerAlgorithm = in.readByte();
      //Read message command type
        byte messageType = in.readByte();
      //Read request sequence number
        int sequenceId = in.readInt();
      // Read meaningless alignment padding
        in.readByte();
      // read length
        int length = in.readInt();
      //Read the text
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
\t\t\t\t
      // Get the serialization algorithm
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
      // Get the message type based on the message instruction type
        Class<? extends Message> messageClass = Message.getMessageClass(messageType);
      // Deserialize the byte array into a message instruction object
        Message message = algorithm.deserialize(messageClass, bytes);
        out.add(message);
    }

}

Test encoder

public static void main(String[] args) throws Exception {<!-- -->
    EmbeddedChannel channel = new EmbeddedChannel(
            new LoggingHandler(),
            new ProtocolFrameDecoder(),
            newMessageCodecSharable()
    );
    int sequenceId = SequenceIdGenerator.nextId();

    Class<HelloService> serviceClass = HelloService.class;
    Method method = HelloService.class.getMethod("sayHello", String.class);
    String[] parameterValue = new String[]{<!-- -->"yanan"};
    RpcRequestMessage message = new RpcRequestMessage(sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), parameterValue);
    channel.writeOutbound(message);
}

Test decoder

public static void main(String[] args) throws Exception {<!-- -->
    EmbeddedChannel channel = new EmbeddedChannel(
            new LoggingHandler(),
            new ProtocolFrameDecoder(),
            newMessageCodecSharable()
    );
    int sequenceId = SequenceIdGenerator.nextId();

    Class<HelloService> serviceClass = HelloService.class;
    Method method = HelloService.class.getMethod("sayHello", String.class);
    String[] parameterValue = new String[]{<!-- -->"yanan"};
    RpcRequestMessage message = new RpcRequestMessage(sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), parameterValue);
    // channel.writeOutbound(message);
    
    ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
    out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
    out.writeByte(1);
    out.writeByte(Config.getSerializerAlgorithm().ordinal());
    out.writeByte(message.getMessageType());
    out.writeInt(message.getSequenceId());
    out.writeByte(0xff);
    byte[] bytes = Config.getSerializerAlgorithm().serialize(message);
    out.writeInt(bytes.length);
    out.writeBytes(bytes);

    channel.writeInbound(out);
}