Simple implementation of RPC framework from shallow to deep Netty

Table of Contents

    • 1 Preparations
    • 2 server handlers
    • 3 The first version of the client code
    • 4 client handler version 1
    • 5 Client Code Second Edition
    • 6 Client Handler Version 2

1 Preparations

These codes can be considered ready-made, no need to write exercises from scratch

For simplicity, add Rpc request and response messages on the basis of the original chat project

@Data
public abstract class Message implements Serializable {<!-- -->

    // omit old code

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {<!-- -->
        //...
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

}

request message

@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {<!-- -->

    /**
     * The fully qualified name of the called interface, the server finds the implementation based on it
     */
    private String interfaceName;
    /**
     * Call the method name in the interface
     */
    private String methodName;
    /**
     * method return type
     */
    private Class<?> returnType;
    /**
     * Method parameter type array
     */
    private Class[] parameterTypes;
    /**
     * Array of method parameter values
     */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {<!-- -->
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this. returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {<!-- -->
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

response message

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {<!-- -->
    /**
     * return value
     */
    private Object returnValue;
    /**
     * Outliers
     */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {<!-- -->
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

server shelf

@Slf4j
public class RpcServer {<!-- -->
    public static void main(String[] args) {<!-- -->
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc request message processor, to be implemented
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
        try {<!-- -->
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap. channel(NioServerSocketChannel. class);
            serverBootstrap.group(boss, worker);
            serverBootstrap. childHandler(new ChannelInitializer<SocketChannel>() {<!-- -->
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel. closeFuture(). sync();
        } catch (InterruptedException e) {<!-- -->
            log.error("server error", e);
        } finally {<!-- -->
            boss. shutdown Gracefully();
            worker. shutdown Gracefully();
        }
    }
}

client shelf

public class RpcClient {<!-- -->
    public static void main(String[] args) {<!-- -->
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc response message processor, to be implemented
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {<!-- -->
            Bootstrap bootstrap = new Bootstrap();
            bootstrap. channel(NioSocketChannel. class);
            bootstrap. group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- -->
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel. closeFuture(). sync();
        } catch (Exception e) {<!-- -->
            log. error("client error", e);
        } finally {<!-- -->
            group. shutdown Gracefully();
        }
    }
}

Server-side service acquisition

public class ServicesFactory {<!-- -->

    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {<!-- -->
        try (InputStream in = Config. class. getResourceAsStream("/application. properties")) {<!-- -->
            properties = new Properties();
            properties. load(in);
            Set<String> names = properties. stringPropertyNames();
            for (String name : names) {<!-- -->
                if (name. endsWith("Service")) {<!-- -->
                    Class<?> interfaceClass = Class. forName(name);
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.newInstance());
                }
            }
        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {<!-- -->
            throw new ExceptionInInitializerError(e);
        }
    }

    public static <T> T getService(Class<T> interfaceClass) {<!-- -->
        return (T) map. get(interfaceClass);
    }
}

Related configuration application.properties

serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2 server handler

@Slf4j
@ChannelHandler. Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {<!-- -->

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {<!-- -->
        RpcResponseMessage response = new RpcResponseMessage();
        response.setSequenceId(message.getSequenceId());
        try {<!-- -->
            // Get the real implementation object
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            
            // Get the method to call
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            
            // call method
            Object invoke = method.invoke(service, message.getParameterValue());
            // call succeeded
            response.setReturnValue(invoke);
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
            // call exception
            response. setExceptionValue(e);
        }
        // return result
        ctx.writeAndFlush(response);
    }
}

3 Client code version 1

message only

@Slf4j
public class RpcClient {<!-- -->
    public static void main(String[] args) {<!-- -->
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {<!-- -->
            Bootstrap bootstrap = new Bootstrap();
            bootstrap. channel(NioSocketChannel. class);
            bootstrap. group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- -->
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "cn.itcast.server.service.HelloService",
                    "sayHello",
                    String. class,
                    new Class[]{<!-- -->String. class},
                    new Object[]{<!-- -->"Zhang San"}
            )).addListener(promise -> {<!-- -->
                if (!promise.isSuccess()) {<!-- -->
                    Throwable cause = promise. cause();
                    log. error("error", cause);
                }
            });

            channel. closeFuture(). sync();
        } catch (Exception e) {<!-- -->
            log. error("client error", e);
        } finally {<!-- -->
            group. shutdown Gracefully();
        }
    }
}

4 client handler first version

@Slf4j
@ChannelHandler. Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {<!-- -->
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {<!-- -->
        log.debug("{}", msg);
    }
}

5 Client Code Second Edition

Including channel management, proxy, receiving results

@Slf4j
public class RpcClientManager {<!-- -->


    public static void main(String[] args) {<!-- -->
        HelloService service = getProxyService(HelloService. class);
        System.out.println(service.sayHello("zhangsan"));
// System.out.println(service.sayHello("lisi"));
// System.out.println(service.sayHello("wangwu"));
    }

    // Create proxy class
    public static <T> T getProxyService(Class<T> serviceClass) {<!-- -->
        ClassLoader loader = serviceClass. getClassLoader();
        Class<?>[] interfaces = new Class[]{<!-- -->serviceClass};
        // sayHello "Zhang San"
        Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {<!-- -->
            // 1. Convert the method call to a message object
            int sequenceId = SequenceIdGenerator. nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceId,
                    serviceClass. getName(),
                    method. getName(),
                    method. getReturnType(),
                    method. getParameterTypes(),
                    args
            );
            // 2. Send the message object
            getChannel().writeAndFlush(msg);

            // 3. Prepare an empty Promise object to receive the result Specify the promise object to receive the result thread asynchronously
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel(). eventLoop());
            RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

// promise. addListener(future -> {<!-- -->
// // thread
// });

            // 4. Wait for promise result
            promise. await();
            if(promise. isSuccess()) {<!-- -->
                // call is normal
                return promise. getNow();
            } else {<!-- -->
                // call failed
                throw new RuntimeException(promise. cause());
            }
        });
        return (T) o;
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    // Get the unique channel object
    public static Channel getChannel() {<!-- -->
        if (channel != null) {<!-- -->
            return channel;
        }
        synchronized (LOCK) {<!-- --> // t2
            if (channel != null) {<!-- --> // t1
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // initialize channel method
    private static void initChannel() {<!-- -->
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap. channel(NioSocketChannel. class);
        bootstrap. group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- -->
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {<!-- -->
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {<!-- -->
            channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().addListener(future -> {<!-- -->
                group. shutdown Gracefully();
            });
        } catch (Exception e) {<!-- -->
            log. error("client error", e);
        }
    }
}

6 client handler second edition

@Slf4j
@ChannelHandler. Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {<!-- -->

    // The sequence number is used to receive the promise object of the result
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {<!-- -->
        log.debug("{}", msg);
        // get an empty promise
        Promise<Object> promise = PROMISES. remove(msg. getSequenceId());
        if (promise != null) {<!-- -->
            Object returnValue = msg. getReturnValue();
            Exception exceptionValue = msg. getExceptionValue();
            if(exceptionValue != null) {<!-- -->
                promise. setFailure(exceptionValue);
            } else {<!-- -->
                promise. setSuccess(returnValue);
            }
        }
    }
}