Implement an RPC framework with Netty

1. Basic introduction to RPC

(1) RPC (Remote Procedure Call): Remote Procedure Call, which is a computer communication protocol. The protocol allows a program running on one computer to call a subroutine on another computer without the programmer having to program additionally for the interaction

(2) Two or more applications are distributed on different servers, and calls between them are like local method calls (as shown in the figure)

(3) Commonly used RPC frameworks include: relatively well-known ones such as Ali’s Dubbo, Google’s gRPC, GO language’s rpcx, Apache’s thrift, and Spring’s Spring Cloud

(4) RPC call process

2. Realize RPC based on Netty

schematic diagram

Overall code directory structure

(1) Create an interface and define abstract methods

(2) Create a provider

ServerBootstrap

NettyServer

public class NettyServer {

    public static void startServer(String hostname,int port){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap. group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel. pipeline();
                            pipeline. addLast(new StringDecoder());
                            pipeline. addLast(new StringEncoder());
                            pipeline. addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture sync = serverBootstrap.bind(hostname, port).sync();
            System.out.println("The service provider starts to provide services~~");
            sync.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            bossGroup. shutdown Gracefully();
            workerGroup. shutdown Gracefully();
        }
    }
}

NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg=" + msg);
        String msgStr = msg.toString();
        if(msgStr. startsWith("HelloService#hello#")){
            String hello = new HelloServiceImpl(). hello((msgStr. substring(msgStr. lastIndexOf("#") + 1)));
            ctx.writeAndFlush(hello);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super. exceptionCaught(ctx, cause);
    }
}

(3) Create a consumer

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    /**
     * the returned result
     */
    private String result;
    /**
     * When the client calls the method, the parameters passed in
     */
    private String para;

    /**
     * Called when a connection to the server is established
     * the first to be called
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive is called");
        context = ctx;
    }

    /**
     * After receiving the data from the server, call the method
     * The fourth is called
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead is called");
        result = msg.toString();
        // wake up the waiting thread
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx. close();
    }

    /**
     * Called by the proxy object, send data to the server, -"wait-"wait to be awakened (channelRead)-"return result
     * The third one is called, there is a fifth step after waiting for channelRead
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception {
        // send message to server
        System.out.println("call is called");
        context.writeAndFlush(para);
        // Wait for the channelRead method to get the result of the server and wake up
        wait();
        System.out.println("call2 is called");
        // The result returned by the server
        return result;
    }

    /**
     * the second is called
     * @param para
     */
    void setPara(String para){
        System.out.println("setPara was called");
        this.para = para;
    }
}

NettyClient

public class NettyClient {
    /**
     * Thread Pool
     */
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler clientHandler;

    /**
     * Write a method to use the proxy mode to get a proxy object
     * @param serviceClass
     * @param providerName
     * @return
     */
    public Object getBean(final Class<?> serviceClass, final String providerName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass},(proxy, method, args) -> {
                    // Every time the client calls hello, it will enter this code
                    if(clientHandler == null){
                        initClient();
                    }
                    initClient();
                    clientHandler.setPara(providerName + args[0]);
                    return executorService. submit(clientHandler). get();
                });
    }

    private static void initClient() {
        clientHandler = new NettyClientHandler();
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap. group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel. pipeline();
                        pipeline. addLast(new StringDecoder());
                        pipeline. addLast(new StringEncoder());
                        pipeline. addLast(clientHandler);
                    }
                });
        try {
            bootstrap.connect("127.0.0.1",7000).sync();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Client Bootstrap

public class ClientBootstrap {
    public static final String porviderName = "HelloService#hello#";

    public static void main(String[] args) {
        NettyClient customer = new NettyClient();
        HelloService service = (HelloService)customer.getBean(HelloService.class, providerName);
        String hello = service.hello("Hello dubbo");
        System.out.println("call result res=" + hello);
    }
}

(4) test

server

consumer