Netty optimization – extending the serialization algorithm in custom protocols

Netty optimization – extending the serialization algorithm in custom protocols

  • 1. Optimization and source code
    • 1. Optimization
      • 1.1 Extend the serialization algorithm in custom protocols

1. Optimization and source code

1. Optimization

1.1 Extend the serialization algorithm in custom protocols

Serialization and deserialization are mainly used in the conversion of message body.

  • When serializing, the Java object needs to be turned into the data to be transmitted (it can be byte[], or json, etc., and ultimately needs to be turned into byte[])
  • When deserializing, the incoming text data needs to be restored to a Java object for easy processing

The current code only supports Java’s own serialization and deserialization mechanisms. The core code is as follows

//Deserialization
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// Serialization
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

In order to support more serialization algorithms, we abstract a Serializer interface and provide two implementations. I directly added the implementation to the enumeration class Serializer.Algorithm.

/**
 * Used to extend serialization and deserialization algorithms
 */
public interface Serializer {<!-- -->

    //Deserialization method
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // Serialization method
    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {<!-- -->

        Java {<!-- -->
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {<!-- -->
                try {<!-- -->
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {<!-- -->
                    throw new RuntimeException("Deserialization failed", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {<!-- -->
                try {<!-- -->
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {<!-- -->
                    throw new RuntimeException("Serialization failed", e);
                }
            }
        },

        /**
         * <dependency>
         * <groupId>com.google.code.gson</groupId>
         * <artifactId>gson</artifactId>
         * <version>2.8.5</version>
         * </dependency>
         */
        Json {<!-- -->
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {<!-- -->
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
                String json = new String(bytes, StandardCharsets.UTF_8);
                return gson.fromJson(json, clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {<!-- -->
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
                String json = gson.toJson(object);
                return json.getBytes(StandardCharsets.UTF_8);
            }
        }
    }

    class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {<!-- -->

        @Override
        public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {<!-- -->
            try {<!-- -->
                String str = json.getAsString();
                return Class.forName(str);
            } catch (ClassNotFoundException e) {<!-- -->
                throw new JsonParseException(e);
            }
        }

        @Override // String.class
        public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {<!-- -->
            // class -> json
            return new JsonPrimitive(src.getName());
        }
    }
}

Add configuration classes and configuration files

public abstract class Config {<!-- -->
    static Properties properties;
    static {<!-- -->
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {<!-- -->
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {<!-- -->
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {<!-- -->
        String value = properties.getProperty("server.port");
        if(value == null) {<!-- -->
            return 8080;
        } else {<!-- -->
            return Integer.parseInt(value);
        }
    }
    public static Serializer.Algorithm getSerializerAlgorithm() {<!-- -->
        String value = properties.getProperty("serializer.algorithm");
        if(value == null) {<!-- -->
            return Serializer.Algorithm.Java;
        } else {<!-- -->
            return Serializer.Algorithm.valueOf(value);
        }
    }
}

Configuration file

serializer.algorithm=Json

Modify codec

/**
 * Must be used together with LengthFieldBasedFrameDecoder to ensure that the ByteBuf message received is complete
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {<!-- -->
    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {<!-- -->
        ByteBuf out = ctx.alloc().buffer();
        // 1. The magic number of 4 bytes
        out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
        // 2. 1 byte version,
        out.writeByte(1);
        // 3. 1 byte serialization method jdk 0, json 1
        out.writeByte(Config.getSerializerAlgorithm().ordinal());
        // 4. 1-byte instruction type
        out.writeByte(msg.getMessageType());
        // 5. 4 bytes
        out.writeInt(msg.getSequenceId());
        // Meaningless, aligned and padded
        out.writeByte(0xff);
        // 6. Get the byte array of content
        byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
        // 7. Length
        out.writeInt(bytes.length);
        // 8. Write content
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {<!-- -->
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerAlgorithm = in.readByte(); // 0 or 1
        byte messageType = in.readByte(); // 0,1,2...
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);

        // Find the deserialization algorithm
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
        // Determine the specific message type
        Class<? extends Message> messageClass = Message.getMessageClass(messageType);
        Message message = algorithm.deserialize(messageClass, bytes);
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);
        out.add(message);
    }
}

To determine the specific message type, you can obtain the corresponding message class based on the message type byte

@Data
public abstract class Message implements Serializable {<!-- -->

    /**
     * According to the message type byte, obtain the corresponding message class
     * @param messageType message type byte
     * @return message class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {<!-- -->
        return messageClasses.get(messageType);
    }

    private int sequenceId;

    private int messageType;

    public abstract int getMessageType();

    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    static {<!-- -->
        messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
        messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
        messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
    }
}

Test class

public class TestSerializer {<!-- -->

    public static void main(String[] args) {<!-- -->
        MessageCodecSharable CODEC = new MessageCodecSharable();
        LoggingHandler LOGGING = new LoggingHandler();
        EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);

        LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123","Zhang San");
        //channel.writeOutbound(message);
        ByteBuf buf = messageToByteBuf(message);
        channel.writeInbound(buf);
    }

    public static ByteBuf messageToByteBuf(Message msg) {<!-- -->
        int algorithm = Config.getSerializerAlgorithm().ordinal();
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        out.writeBytes(new byte[]{<!-- -->1, 2, 3, 4});
        out.writeByte(1);
        out.writeByte(algorithm);
        out.writeByte(msg.getMessageType());
        out.writeInt(msg.getSequenceId());
        out.writeByte(0xff);
        byte[] bytes = Serializer.Algorithm.values()[algorithm].serialize(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
        return out;
    }
}

json serialization result

16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password= 123, name=Zhang San)
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 103B
          +------------------------------------------------- +
         | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
 + -------- + ---------------------------------------- --------- + ---------------- +
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 57 |.............W|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{<!-- -->"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 6e 61 6d 65 22 3a 22 |":"123","name":"|
|00000040| e5 bc a0 e4 b8 89 22 2c 22 73 65 71 75 65 6e 63 |......","sequenc|
|00000050| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000060| 79 70 65 22 3a 30 7d |ype":0} |
 + -------- + ---------------------------------------- --------- + ---------------- +
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH

json deserialization result

16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 103B
          +------------------------------------------------- +
         | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
 + -------- + ---------------------------------------- --------- + ---------------- +
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 57 |.............W|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{<!-- -->"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 6e 61 6d 65 22 3a 22 |":"123","name":"|
|00000040| e5 bc a0 e4 b8 89 22 2c 22 73 65 71 75 65 6e 63 |......","sequenc|
|00000050| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000060| 79 70 65 22 3a 30 7d |ype":0} |
 + -------- + ---------------------------------------- --------- + ---------------- +
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password= 123, name=Zhang San)
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE