Netty-RPC framework implementation solution

Article directory

  • Netty-RPC framework implementation solution
    • flow chart
    • key components
    • implementation details
    • summary

Netty-RPC framework implementation solution

Flowchart

Key components

In general, I divide the whole framework into four parts: Provider, Consumer, API, server, Register

  • Provider : Producer – the provider of the service, used to provide the implementation of the service
  • Consumer : Consumer – the consumer of the service, used to consume the producer
  • API : Open interface to the outside world – an object that producers and consumers depend on, and is mainly used to define public interfaces for easy calling
  • Register : Register the producer, the author uses the Redis registration center here, mainly for convenience
  • Server : The core of the server, here mainly writes the main framework of Netty for communication

Implementation details

  • API : Open interface to the outside world – an object that producers and consumers depend on, and is mainly used to define public interfaces for easy calling
    Interface defined:
public interface HelloService {<!-- -->

    String hi();

    String say(String str);

    String sayHi(Hi hi);

}

Defined entity object:

public class Hi {<!-- -->

    private String userName;
    private String sayMsg;

    public String getUserName() {<!-- -->
        return userName;
    }

    public void setUserName(String userName) {<!-- -->
        this. userName = userName;
    }

    public String getSayMsg() {<!-- -->
        return sayMsg;
    }

    public void setSayMsg(String sayMsg) {<!-- -->
        this.sayMsg = sayMsg;
    }
}

  • Provider : Producer – the provider of the service, the implementation used to provide the service depends on the Server
    Object implementation for API
@Service("helloService")
public class HelloServiceImpl implements HelloService {<!-- -->
    @Override
    public String hi() {<!-- -->
        return "KSK link completed!";
    }

    @Override
    public String say(String str) {<!-- -->
        return "Did I succeed?";
    }

    @Override
    public String sayHi(Hi hi) {<!-- -->
        return "Work hard and slowly, you will succeed!";
    }
}

Profile Configuration

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rpc="http://rpc.ksk.cn/schema/rpc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
http://rpc.ksk.cn/schema/rpc
http://rpc.ksk.cn/schema/rpc/rpc.xsd"
>

<!-- Registration Center -->
<rpc:server id="provider_rpc" host="127.0.0.1" port="6379"/>
//When injected into Redis management, it will be stored with the combination of nozzle_alias as Key
<rpc:provider id="helloServiceRpc" nozzle="cn.ksk.rpc.api.HelloService"
              ref="helloService" alias="Rpc" />

</beans>
  • Register : Register the producer. The author uses the Redis registration center here, mainly for convenience. The core logic here is to uninstall the server
  • Core code
/**
 * Simulate RPC registration ZTE
 */
public class RedisRegisterCenter {<!-- -->

    private static Jedis jedis; //non-slice client connection

    //Initialize Redis
    public static void init(String host , int port){<!-- -->
        //Basic pool configuration
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(5);
        config.setTestOnBorrow(false);
        //During initialization, directly pass in the host and port configured in the configuration file for connection
        JedisPool jedisPool = new JedisPool(config, host, port);
        jedis = jedisPool. getResource();
    }
    /**
     * Registered producer
     *
     * @param nozzle interface
     * @param alias alias
     * @param info information
     * @return registration result
     */
    public static Long registerProvider(String nozzle , String alias , String info){<!-- -->
        return jedis. sadd(nozzle + "_" + alias , info);
    }
   /**
     * Get producer
     * @param nozzle
     * @param alias
     * @return
     */
    public static String obtainProvider(String nozzle , String alias){<!-- -->
        return jedis.srandmember(nozzle + "_" + alias);
    }
  • Server : Register the producer. The author uses the Redis registration center here, mainly for convenience. The core logic here is to uninstall the Server
  • Some basic knowledge of ClientSocket and ServerSocket in the service center will not be discussed here. For details, please refer to the author’s previous article: Netty-Requirements for Realizing Rpc Core Framework Functions. Here we mainly talk about Provider and Consumer start the initialization logic
  • PROVIDER : relatively simple, it is to inject yourself into the registration center and keep your own information in JSON form
public class ProviderBean extends ProviderConfig implements ApplicationContextAware {<!-- -->

    /**
     * log printing
     */
    private Logger logger = LoggerFactory. getLogger(ProviderBean. class);
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {<!-- -->
        RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
        //Set the interface
        rpcProviderConfig.setNozzle(nozzle);
        //map name
        rpcProviderConfig.setRef(ref);
        //alias
        rpcProviderConfig.setAlias(alias);
        //IP When the project starts here, it is obtained locally when init() is initialized. In order to ensure that the Server must force the Provider
        //Start first, you must configure the Server before the Provider when configuring
        rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST);
        //port The port generated when init() is initialized when the project starts here
        rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT);

        //producer registration
        Long count = RedisRegisterCenter.registerProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig));

        logger.info("Register producer: {} {} {}" , nozzle , alias , count);
    }
}
  • Consumer : To implement the FactoryBean interface and implement the getObject() in it, this method will hand over the returned object to the Spring container management (not rewriting the default is to create directly without proxy ), and at the same time, the assignment in the object is given details when registering. Refer to:Netty-Requirements to realize the Rpc core framework function pre-requirements
package cn.ksk.design.rpc.condig.spring.bean;

import cn.ksk.design.rpc.condig.ConsumerConfig;
import cn.ksk.design.rpc.domain.RpcProviderConfig;
import cn.ksk.design.rpc.network.client.ClientSocket;
import cn.ksk.design.rpc.network.mesg.Request;
import cn.ksk.design.rpc.reflect.JDKProxy;
import cn.ksk.design.rpc.register.RedisRegisterCenter;
import cn.ksk.design.rpc.util.ClassLoaderUtils;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.Assert;

public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean {<!-- -->

    /**
     * log printing
     */
    private Logger logger = LoggerFactory. getLogger(ConsumerBean. class);

    private ChannelFuture channelFuture;
    private RpcProviderConfig rpcProviderConfig;
    /**
     * Get Object Get object will be automatically put into the Spring management container, the name is the alias of the configuration file configuration
     * Created in proxy mode
     * Then every time the method is called, the RPC framework connection will be used to find the object
     * @return
     * @throws Exception
     */
    @Override
    public Object getObject() throws Exception {<!-- -->
        // Get a connection from Redis If there is no connection, get one from the registration center
        if (null == rpcProviderConfig){<!-- -->
            String infoStr = RedisRegisterCenter.obtainProvider(nozzle, alias);
            rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class);
        }
        Assert.isTrue(null != rpcProviderConfig);
        //Establish communication after the connection is obtained. If it is empty, it means no connection is established.
        if (null == channelFuture){<!-- -->
            ClientSocket socket = new ClientSocket(rpcProviderConfig. getHost(), rpcProviderConfig. getPort());
            new Thread(socket).start();
            //Cycle assignment to the connection channel
            for (int i = 0; i < 100; i ++ ) {<!-- -->
                if (null != channelFuture) break;
                Thread. sleep(500);
                channelFuture = socket. getFuture();
            }
        }
        //Abort the program if unsuccessful
        Assert. isTrue(null != channelFuture);
        // The channel of todo communication is saved and strengthened with reflection. Every time the method is called, it is called with reflection
        // Save the information and objects that need to be requested to change the channel Create an object based on the interface name

        Request request = new Request();
        request.setRef(rpcProviderConfig.getRef());
        request.setNozzle(nozzle);
        request.setAlias(alias);
        //Set the corresponding connection channel
        request.setChannel(channelFuture.channel());
        //Here, reflection is used to return the method of the proxy. Every time the method of this interface is called, the invoke of the proxy method will be used to make a remote call and return the result
        return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle) , request);
    }


    /**
     * Get the Class file type of the object
     * @return
     */
    @Override
    public Class<?> getObjectType() {<!-- -->
        try {<!-- -->
            return ClassLoaderUtils.forName(nozzle);
        } catch (ClassNotFoundException e) {<!-- -->
            return null;
        }
    }

    /**
     * Whether it is a single column must be True
     * @return
     */
    @Override
    public boolean isSingleton() {<!-- -->
        return false;
    }
}


Reflect core method: Here is the core method of reflection logic

public class JDKInvocationHandler implements InvocationHandler {<!-- -->

    private Request request;
    public JDKInvocationHandler(Request request) {<!-- -->
        this.request = request;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {<!-- -->
        String methodName = method. getName();
        Class<?>[] parameterTypes = method. getParameterTypes();
        //Judging common methods to exclude
        if ("toString".equals(methodName) & amp; & amp; parameterTypes.length == 0) {<!-- -->
            return request.toString();
        } else if ("hashCode".equals(methodName) & amp; & amp; parameterTypes.length == 0) {<!-- -->
            return request.hashCode();
        } else if ("equals".equals(methodName) & amp; & amp; parameterTypes.length == 1) {<!-- -->
            return request. equals(args[0]);
        }

        //Method to strengthen setting parameters
        request.setMethodName(methodName);
        request.setParamTypes(parameterTypes);
        request.setArgs(args);
        request.setRef(request.getRef());
        //Remote call and get return value logic
        Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000);
        return response. getResult();

    }
}

Summary

The above is the core RPC framework implemented by the author based on Netty. If there are any deficiencies, please be merciful.
Specific code implementation address: RPC framework implementation source code