简易RPC框架 - 服务订阅、发现模块

1、结构设计

首先先看Rpc客户端的设计思路:

ttt

2、实现

这里也是直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class RpcClientAutoConfiguration implements BeanPostProcessor, ApplicationListener<ApplicationReadyEvent> {
private static NettyRpcClient client;
private volatile boolean needInitClient = false;
private volatile boolean hasInitClientConfig = false;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
final Class<?> targetClass = bean.getClass();
final Field[] declaredFields = targetClass.getDeclaredFields();
for (Field field : declaredFields) {
if(field.isAnnotationPresent(RpcReference.class)){
if(!hasInitClientConfig) {
//每个Reference对应一个RpcClient
client = new NettyRpcClient();
client.initClientApplication();
hasInitClientConfig = true;
}
needInitClient = true;
final RpcReference annotation = field.getAnnotation(RpcReference.class);
final RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(annotation.group())
.version(annotation.version())
.build();
field.setAccessible(true);
RpcClientProxy rpcClientProxy = new RpcClientProxy(client, rpcServiceConfig);
final Object proxy = rpcClientProxy.getProxy(field.getType());
try {
//代理类替换
field.set(bean, proxy);
//服务订阅
client.doSubscribeService(field.getType());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
return bean;

}

/**
* 服务启动后,自动连接服务端
* @param applicationReadyEvent 服务启动事件
*/
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
if(needInitClient && client != null){
log.info(" ================== [{}] started success ================== ", client.getClass().getName());
ConnectionHandler.setBootstrap(client.getBootstrap());
//将Client与每个服务提供者连接
client.doConnectServer();
}
}
}

值得注意的是,我们这里用到了ApplicationContext的事件机制,因为需要需要实现通道连接时保证所有的@Reference都已进行服务订阅,查了很多资料,学到了这个方法。首先该类实现了ApplicationListener,那么每当ApplicationContext发布ApplicationEvent时,ApplicationListener Bean将自动被触发。我们这里监听的是ApplicationReadyEvent事件,当上下文已经准备完毕的时候触发onApplicationEvent()方法。

通过ApplicationContext的事件机制,我们就可以实现这个效果:Spring扫描整个类的@Reference属性,然后进行服务订阅,将@Reference对应的信息封装成URL对象。当上下文准备完毕,意味着所有的@Reference属性都已进行订阅,此时会触发onApplicationEvent()方法,将Client与每个@Reference属性对应的RpcServer建立一个Channel,之后当需要远程调用时,就可以在代理类中获取对应的Channel进行通信。

在这里,将URL和代理类贴出来,方便大家理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@AllArgsConstructor
@NoArgsConstructor
@Data
public class URL {
private String applicationName;
private String serviceName;
/**
* 这里面可以自定义不限进行扩展
* 分组
* 权重
* 服务提供者的地址
* 服务提供者的端口
*/
private Map<String, String> parameters = new HashMap<>();

public void addParameter(String key, String value) {
this.parameters.putIfAbsent(key, value);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";

private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceConfig rpcServiceConfig;


public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = rpcServiceConfig;
}

/**
* 获取代理对象
* @param clazz 代理类
* @param <T> 代理类类型
* @return 代理对象
*/
public <T> T getProxy(Class<T> clazz){
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.getName().equals("toString")){
return null;
}
log.info("开始执行方法: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder()
.requestId(UUID.randomUUID().toString())
.parameters(args)
.methodName(method.getName())
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
//通过当前服务对应的唯一channel向服务器发送请求
if (rpcRequestTransport instanceof NettyRpcClient) {
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
}
if(rpcRequestTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
}
check(rpcRequest, rpcResponse);
return rpcResponse.getData();

}

private void check(RpcRequest rpcRequest, RpcResponse rpcResponse){
if(rpcResponse == null){
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if(!rpcResponse.getRequestId().equals(rpcRequest.getRequestId())){
throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if(rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())){
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
}

至于服务订阅、通道连接的代码大家可以根据自己的需要进行设计,我这里就不再描述了。

至于负载均衡,可以在通道连接的时候设计,也可以按大家的想法来。