Table of Contents
-
- 1 Preparations
- 2 server handlers
- 3 The first version of the client code
- 4 client handler version 1
- 5 Client Code Second Edition
- 6 Client Handler Version 2
1 Preparations
These codes can be considered ready-made, no need to write exercises from scratch
For simplicity, add Rpc request and response messages on the basis of the original chat project
@Data public abstract class Message implements Serializable {<!-- --> // omit old code public static final int RPC_MESSAGE_TYPE_REQUEST = 101; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102; static {<!-- --> //... messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
request message
@Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message {<!-- --> /** * The fully qualified name of the called interface, the server finds the implementation based on it */ private String interfaceName; /** * Call the method name in the interface */ private String methodName; /** * method return type */ private Class<?> returnType; /** * Method parameter type array */ private Class[] parameterTypes; /** * Array of method parameter values */ private Object[] parameterValue; public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {<!-- --> super.setSequenceId(sequenceId); this.interfaceName = interfaceName; this.methodName = methodName; this. returnType = returnType; this.parameterTypes = parameterTypes; this.parameterValue = parameterValue; } @Override public int getMessageType() {<!-- --> return RPC_MESSAGE_TYPE_REQUEST; } }
response message
@Data @ToString(callSuper = true) public class RpcResponseMessage extends Message {<!-- --> /** * return value */ private Object returnValue; /** * Outliers */ private Exception exceptionValue; @Override public int getMessageType() {<!-- --> return RPC_MESSAGE_TYPE_RESPONSE; } }
server shelf
@Slf4j public class RpcServer {<!-- --> public static void main(String[] args) {<!-- --> NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // rpc request message processor, to be implemented RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler(); try {<!-- --> ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap. channel(NioServerSocketChannel. class); serverBootstrap.group(boss, worker); serverBootstrap. childHandler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080).sync().channel(); channel. closeFuture(). sync(); } catch (InterruptedException e) {<!-- --> log.error("server error", e); } finally {<!-- --> boss. shutdown Gracefully(); worker. shutdown Gracefully(); } } }
client shelf
public class RpcClient {<!-- --> public static void main(String[] args) {<!-- --> NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // rpc response message processor, to be implemented RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try {<!-- --> Bootstrap bootstrap = new Bootstrap(); bootstrap. channel(NioSocketChannel. class); bootstrap. group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost", 8080).sync().channel(); channel. closeFuture(). sync(); } catch (Exception e) {<!-- --> log. error("client error", e); } finally {<!-- --> group. shutdown Gracefully(); } } }
Server-side service acquisition
public class ServicesFactory {<!-- --> static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap<>(); static {<!-- --> try (InputStream in = Config. class. getResourceAsStream("/application. properties")) {<!-- --> properties = new Properties(); properties. load(in); Set<String> names = properties. stringPropertyNames(); for (String name : names) {<!-- --> if (name. endsWith("Service")) {<!-- --> Class<?> interfaceClass = Class. forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {<!-- --> throw new ExceptionInInitializerError(e); } } public static <T> T getService(Class<T> interfaceClass) {<!-- --> return (T) map. get(interfaceClass); } }
Related configuration application.properties
serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
2 server handler
@Slf4j @ChannelHandler. Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {<!-- --> @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {<!-- --> RpcResponseMessage response = new RpcResponseMessage(); response.setSequenceId(message.getSequenceId()); try {<!-- --> // Get the real implementation object HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); // Get the method to call Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); // call method Object invoke = method.invoke(service, message.getParameterValue()); // call succeeded response.setReturnValue(invoke); } catch (Exception e) {<!-- --> e.printStackTrace(); // call exception response. setExceptionValue(e); } // return result ctx.writeAndFlush(response); } }
3 Client code version 1
message only
@Slf4j public class RpcClient {<!-- --> public static void main(String[] args) {<!-- --> NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try {<!-- --> Bootstrap bootstrap = new Bootstrap(); bootstrap. channel(NioSocketChannel. class); bootstrap. group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost", 8080).sync().channel(); ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage( 1, "cn.itcast.server.service.HelloService", "sayHello", String. class, new Class[]{<!-- -->String. class}, new Object[]{<!-- -->"Zhang San"} )).addListener(promise -> {<!-- --> if (!promise.isSuccess()) {<!-- --> Throwable cause = promise. cause(); log. error("error", cause); } }); channel. closeFuture(). sync(); } catch (Exception e) {<!-- --> log. error("client error", e); } finally {<!-- --> group. shutdown Gracefully(); } } }
4 client handler first version
@Slf4j @ChannelHandler. Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {<!-- --> @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {<!-- --> log.debug("{}", msg); } }
5 Client Code Second Edition
Including channel management, proxy, receiving results
@Slf4j public class RpcClientManager {<!-- --> public static void main(String[] args) {<!-- --> HelloService service = getProxyService(HelloService. class); System.out.println(service.sayHello("zhangsan")); // System.out.println(service.sayHello("lisi")); // System.out.println(service.sayHello("wangwu")); } // Create proxy class public static <T> T getProxyService(Class<T> serviceClass) {<!-- --> ClassLoader loader = serviceClass. getClassLoader(); Class<?>[] interfaces = new Class[]{<!-- -->serviceClass}; // sayHello "Zhang San" Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {<!-- --> // 1. Convert the method call to a message object int sequenceId = SequenceIdGenerator. nextId(); RpcRequestMessage msg = new RpcRequestMessage( sequenceId, serviceClass. getName(), method. getName(), method. getReturnType(), method. getParameterTypes(), args ); // 2. Send the message object getChannel().writeAndFlush(msg); // 3. Prepare an empty Promise object to receive the result Specify the promise object to receive the result thread asynchronously DefaultPromise<Object> promise = new DefaultPromise<>(getChannel(). eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); // promise. addListener(future -> {<!-- --> // // thread // }); // 4. Wait for promise result promise. await(); if(promise. isSuccess()) {<!-- --> // call is normal return promise. getNow(); } else {<!-- --> // call failed throw new RuntimeException(promise. cause()); } }); return (T) o; } private static Channel channel = null; private static final Object LOCK = new Object(); // Get the unique channel object public static Channel getChannel() {<!-- --> if (channel != null) {<!-- --> return channel; } synchronized (LOCK) {<!-- --> // t2 if (channel != null) {<!-- --> // t1 return channel; } initChannel(); return channel; } } // initialize channel method private static void initChannel() {<!-- --> NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel. DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); Bootstrap bootstrap = new Bootstrap(); bootstrap. channel(NioSocketChannel. class); bootstrap. group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() {<!-- --> @Override protected void initChannel(SocketChannel ch) throws Exception {<!-- --> ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try {<!-- --> channel = bootstrap.connect("localhost", 8080).sync().channel(); channel.closeFuture().addListener(future -> {<!-- --> group. shutdown Gracefully(); }); } catch (Exception e) {<!-- --> log. error("client error", e); } } }
6 client handler second edition
@Slf4j @ChannelHandler. Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {<!-- --> // The sequence number is used to receive the promise object of the result public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {<!-- --> log.debug("{}", msg); // get an empty promise Promise<Object> promise = PROMISES. remove(msg. getSequenceId()); if (promise != null) {<!-- --> Object returnValue = msg. getReturnValue(); Exception exceptionValue = msg. getExceptionValue(); if(exceptionValue != null) {<!-- --> promise. setFailure(exceptionValue); } else {<!-- --> promise. setSuccess(returnValue); } } } }