简易RPC框架 - 服务订阅、发现模块
1、结构设计
首先先看Rpc客户端的设计思路:

2、实现
这里也是直接上代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 
 | @Slf4jpublic class RpcClientAutoConfiguration implements BeanPostProcessor, ApplicationListener<ApplicationReadyEvent> {
 private static NettyRpcClient client;
 private volatile boolean needInitClient = false;
 private volatile boolean hasInitClientConfig = false;
 
 @Override
 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 final Class<?> targetClass = bean.getClass();
 final Field[] declaredFields = targetClass.getDeclaredFields();
 for (Field field : declaredFields) {
 if(field.isAnnotationPresent(RpcReference.class)){
 if(!hasInitClientConfig) {
 
 client = new NettyRpcClient();
 client.initClientApplication();
 hasInitClientConfig = true;
 }
 needInitClient = true;
 final RpcReference annotation = field.getAnnotation(RpcReference.class);
 final RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
 .group(annotation.group())
 .version(annotation.version())
 .build();
 field.setAccessible(true);
 RpcClientProxy rpcClientProxy = new RpcClientProxy(client, rpcServiceConfig);
 final Object proxy = rpcClientProxy.getProxy(field.getType());
 try {
 
 field.set(bean, proxy);
 
 client.doSubscribeService(field.getType());
 } catch (IllegalAccessException e) {
 e.printStackTrace();
 } catch (UnknownHostException e) {
 e.printStackTrace();
 }
 }
 }
 return bean;
 
 }
 
 
 
 
 
 @Override
 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
 if(needInitClient && client != null){
 log.info(" ================== [{}] started success ================== ", client.getClass().getName());
 ConnectionHandler.setBootstrap(client.getBootstrap());
 
 client.doConnectServer();
 }
 }
 }
 
 | 
值得注意的是,我们这里用到了ApplicationContext的事件机制,因为需要需要实现通道连接时保证所有的@Reference都已进行服务订阅,查了很多资料,学到了这个方法。首先该类实现了ApplicationListener,那么每当ApplicationContext发布ApplicationEvent时,ApplicationListener Bean将自动被触发。我们这里监听的是ApplicationReadyEvent事件,当上下文已经准备完毕的时候触发onApplicationEvent()方法。
通过ApplicationContext的事件机制,我们就可以实现这个效果:Spring扫描整个类的@Reference属性,然后进行服务订阅,将@Reference对应的信息封装成URL对象。当上下文准备完毕,意味着所有的@Reference属性都已进行订阅,此时会触发onApplicationEvent()方法,将Client与每个@Reference属性对应的RpcServer建立一个Channel,之后当需要远程调用时,就可以在代理类中获取对应的Channel进行通信。
在这里,将URL和代理类贴出来,方便大家理解:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | @AllArgsConstructor@NoArgsConstructor
 @Data
 public class URL {
 private String applicationName;
 private String serviceName;
 
 
 
 
 
 
 
 private Map<String, String> parameters = new HashMap<>();
 
 public void addParameter(String key, String value) {
 this.parameters.putIfAbsent(key, value);
 }
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 
 | @Slf4jpublic class RpcClientProxy implements InvocationHandler {
 private static final String INTERFACE_NAME = "interfaceName";
 
 private final RpcRequestTransport rpcRequestTransport;
 private final RpcServiceConfig rpcServiceConfig;
 
 
 public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
 this.rpcRequestTransport = rpcRequestTransport;
 this.rpcServiceConfig = rpcServiceConfig;
 }
 
 
 
 
 
 
 
 public <T> T getProxy(Class<T> clazz){
 return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
 }
 
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 if(method.getName().equals("toString")){
 return null;
 }
 log.info("开始执行方法: [{}]", method.getName());
 RpcRequest rpcRequest = RpcRequest.builder()
 .requestId(UUID.randomUUID().toString())
 .parameters(args)
 .methodName(method.getName())
 .interfaceName(method.getDeclaringClass().getName())
 .paramTypes(method.getParameterTypes())
 .group(rpcServiceConfig.getGroup())
 .version(rpcServiceConfig.getVersion())
 .build();
 RpcResponse<Object> rpcResponse = null;
 
 if (rpcRequestTransport instanceof NettyRpcClient) {
 CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
 rpcResponse = completableFuture.get();
 }
 if(rpcRequestTransport instanceof SocketRpcClient) {
 rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
 }
 check(rpcRequest, rpcResponse);
 return rpcResponse.getData();
 
 }
 
 private void check(RpcRequest rpcRequest, RpcResponse rpcResponse){
 if(rpcResponse == null){
 throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
 }
 if(!rpcResponse.getRequestId().equals(rpcRequest.getRequestId())){
 throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
 }
 if(rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())){
 throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
 }
 }
 }
 
 | 
至于服务订阅、通道连接的代码大家可以根据自己的需要进行设计,我这里就不再描述了。
至于负载均衡,可以在通道连接的时候设计,也可以按大家的想法来。