Table of Contents
- 1 Why do you need an agreement?
- 2 Example of redis protocol
- 3 Example of http protocol
- 4 Custom Protocol Elements
-
- 4.1 Codecs
- 4.2 When can add @Sharable
1 Why do you need a protocol?
Message transmission in TCP/IP is stream-based and has no boundaries.
The purpose of the agreement is to delineate the boundary of the message and formulate the communication rules that both parties must abide by
For example: transmission over the network
Leave guests on rainy days and not me on rainy days
It is a famous Chinese sentence without punctuation marks. Without punctuation marks, this sentence has several dismantling methods, but the meaning is completely different, so it is often used to explain the importance of punctuation marks
an interpretation
Leave a guest on a rainy day, stay on a rainy day, but I will not stay
another interpretation
Rainy day, stay away, will you stay with me? Keep
How to design a protocol? In fact, it is to add “punctuation marks” to the information transmitted by the network. But it is not very good to segment by delimiter, because the delimiter itself must be distinguished if it is used for transmission. Therefore, the following protocol is more commonly used
Fixed-length bytes indicate content length + actual content
For example, assuming that a Chinese character has a length of 3, according to the rules of the above protocol, the method of sending information is as follows, so that the meaning will not be misunderstood by the receiver
0f stay in rainy days 06 stay in 09 I do not stay
short story
A long time ago, a private school teacher came to teach in a family. The two parties signed an agreement: “No chicken, duck, no fish, no cabbage, tofu, no less gold”. Since then, although the private school teacher has been serious about teaching, the host always gives him cabbage and tofu as a dish, and there is no hospitality of chicken, duck and fish at all. Mr. Private School was very puzzled at first, but later he figured it out: the owner will exchange the money for chicken, duck and fish meat with Shuxiu gold, no matter what. So far, the two sides live in peace.
The end of the year is approaching, and a school year is also coming to an end. When Mr. Private School was leaving, he didn’t see his master handing over Shu Xiujin for him, so he argued with the master. However, the host family also has plausible words: “There is an agreement to prove it-no chicken, duck, fish, cabbage and tofu are indispensable, and repair gold is not allowed. This is clearly written in black and white. What do you want to say?”
Mr. Private School argued hard: “The agreement is like this-duck is acceptable without chicken; meat is acceptable without fish; cabbage and tofu are not allowed, and Xiujin is a must.”
The two sides are fighting with each other, you come and go, it’s really a joy!
Shuxiu gold here, also known as “shuxiu”, should generally refer to the remuneration teachers should get
2 Example of redis protocol
NioEventLoopGroup worker = new NioEventLoopGroup(); byte[] LINE = {<!-- -->13, 10}; try {<!-- --> Bootstrap bootstrap = new Bootstrap(); bootstrap. channel(NioSocketChannel. class); bootstrap. group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) {<!-- --> ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {<!-- --> // After the connection channel is successfully established, the active event will be triggered @Override public void channelActive(ChannelHandlerContext ctx) {<!-- --> set(ctx); get(ctx); } private void get(ChannelHandlerContext ctx) {<!-- --> ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*2".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("get".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } private void set(ChannelHandlerContext ctx) {<!-- --> ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- --> ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync(); channelFuture. channel(). closeFuture(). sync(); } catch (InterruptedException e) {<!-- --> log. error("client error", e); } finally {<!-- --> worker. shutdown Gracefully(); }
3 http protocol example
NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try {<!-- --> ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap. channel(NioServerSocketChannel. class); serverBootstrap.group(boss, worker); serverBootstrap. childHandler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {<!-- --> @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {<!-- --> // get request log.debug(msg.uri()); // return the response DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg. protocolVersion(), HttpResponseStatus. OK); byte[] bytes = "<h1>Hello, world!</h1>".getBytes(); response. headers(). setInt(CONTENT_LENGTH, bytes. length); response. content(). writeBytes(bytes); // write back the response ctx.writeAndFlush(response); } }); /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}", msg.getClass()); if (msg instanceof HttpRequest) { // request line, request header } else if (msg instanceof HttpContent) { //Request body } } });*/ } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture. channel(). closeFuture(). sync(); } catch (InterruptedException e) {<!-- --> log.error("server error", e); } finally {<!-- --> boss. shutdown Gracefully(); worker. shutdown Gracefully(); }
4 Custom protocol elements
- Magic number, used to determine whether it is an invalid packet at the first time
- Version number, can support protocol upgrade
- The serialization algorithm, which serialization and deserialization method is used for the message body, can be extended, for example: json, protobuf, hessian, jdk
- The command type is login, registration, single chat, group chat… related to business
- Request sequence number, providing asynchronous capability for duplex communication
- text length
- message body
4.1 codec
According to the above elements, design a login request message and login response message, and use Netty to complete sending and receiving
@Slf4j public class MessageCodec extends ByteToMessageCodec<Message> {<!-- --> @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {<!-- --> // 1. 4 byte magic number out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4}); // 2. 1 byte version, out. writeByte(1); // 3. 1-byte serialization method jdk 0 , json 1 out. writeByte(0); // 4. 1 byte instruction type out.writeByte(msg.getMessageType()); // 5. 4 bytes out.writeInt(msg.getSequenceId()); // meaningless, alignment padding out.writeByte(0xff); // 6. Get the byte array of the content ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); // 7. Length out.writeInt(bytes.length); // 8. Write content out. writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- --> int magicNum = in. readInt(); byte version = in. readByte(); byte serializerType = in. readByte(); byte messageType = in. readByte(); int sequenceId = in. readInt(); in. readByte(); int length = in. readInt(); byte[] bytes = new byte[length]; in. readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois. readObject(); log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}", message); out. add(message); } }
test
EmbeddedChannel channel = new EmbeddedChannel( new LoggingHandler(), new LengthFieldBasedFrameDecoder( 1024, 12, 4, 0, 0), new MessageCodec() ); // encode LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "Zhangsan"); // channel.writeOutbound(message); // decode ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, message, buf); ByteBuf s1 = buf. slice(0, 100); ByteBuf s2 = buf. slice(100, buf. readableBytes() - 100); s1.retain(); // reference count 2 channel.writeInbound(s1); // release 1 channel.writeInbound(s2);
interpret
4.2 When can @Sharable be added
- When the handler does not save state, it can be safely shared by multiple threads
- But note that for codec classes, you cannot inherit ByteToMessageCodec or CombinedChannelDuplexHandler parent classes, and their construction methods have restrictions on @Sharable
- If you can ensure that the codec will not save state, you can inherit the MessageToMessageCodec parent class
@Slf4j @ChannelHandler. Sharable /** * Must be used together with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete */ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {<!-- --> @Override protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {<!-- --> ByteBuf out = ctx.alloc().buffer(); // 1. 4 byte magic number out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4}); // 2. 1 byte version, out. writeByte(1); // 3. The serialization method of 1 byte jdk 0 , json 1 out. writeByte(0); // 4. 1 byte instruction type out.writeByte(msg.getMessageType()); // 5. 4 bytes out.writeInt(msg.getSequenceId()); // meaningless, alignment padding out.writeByte(0xff); // 6. Get the byte array of the content ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); // 7. Length out.writeInt(bytes.length); // 8. Write content out. writeBytes(bytes); outList. add(out); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- --> int magicNum = in. readInt(); byte version = in. readByte(); byte serializerType = in. readByte(); byte messageType = in. readByte(); int sequenceId = in. readInt(); in. readByte(); int length = in. readInt(); byte[] bytes = new byte[length]; in. readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois. readObject(); log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}", message); out. add(message); } }