简易RPC框架 - 网络传输模块
1、前言
我们在前面的文章中说到了客户端通过代理对象进行远程调用,其中通过Channel与服务端进行网络通信,其实就是通过网络请求来传递类信息、方法信息以及方法参数等数据到服务端。其中网络传输的具体实现在本项目中我们使用的是基于NIO的网络编程框架Netty。
2、网络传输
1)网络传输实体类
在此我们先定义了一些在网络传输中的数据格式:
RpcRequest请求类,当你要调用远程方法时,需要将你要调用的方法的详细信息传输到服务器端,然后服务端就能根据这些信息去获取方法对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @AllArgsConstructor @NoArgsConstructor @Data @Builder @ToString public class RpcRequest implements Serializable { private static final long serialVersionUID = 6672133783386466359L;
private String requestId; private String interfaceName; private String methodName; private Object[] parameters; private Class<?>[] paramTypes; private String version; private String group;
public String getRpcServiceName() { return getInterfaceName() + getGroup() + getVersion(); } }
|
RpcResponse响应类,服务端执行完请求后,就可以将响应信息封装成响应类传输给客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @AllArgsConstructor @NoArgsConstructor @Getter @Setter @Builder @ToString public class RpcResponse<T> implements Serializable { private static final long serialVersionUID = 6672133783386466359L;
private String requestId; private Integer code; private String message; private T data;
public static <T> RpcResponse<T> success(T data, String requestId) { RpcResponse<T> response = new RpcResponse<>(); response.setCode(RpcResponseCode.SUCCESS.getCode()); response.setMessage(RpcResponseCode.SUCCESS.getMessage()); response.setRequestId(requestId); if(data != null) { response.setData(data); } return response; }
public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) { RpcResponse<T> response = new RpcResponse<>(); response.setCode(rpcResponseCodeEnum.getCode()); response.setMessage(rpcResponseCodeEnum.getMessage()); return response; } }
|
但真正在网络中传输不是仅仅是传输这个对象就行了,还有可能出现很多问题,比如在TCP传输过程中的黏包半包问题。
粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,如下图所示:
半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包,如下图所示:
为什么会出现黏包半包问题?粘包问题发生在 TCP/IP 协议中,因为 TCP 是面向连接的传输协议,它是以“流”的形式传输数据的,而“流”数据是没有明确的开始和结尾边界的,所以就会出现粘包问题。
2)自定义传输协议
那要什么解决这个问题呢?我们采用自定义传输协议,并对数据进行相应的编码解码以解决这个问题。简单来说,我们通过设计传输协议,定义需要传输的数据以及其需要占多少字节的数据,当我们在收到传输数据后,就可以根据我们设计的传输协议去解析出正确的数据。
首先我们定义真正在网络中进行传输的对象,RpcMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @AllArgsConstructor @NoArgsConstructor @Data @Builder @ToString public class RpcMessage { private byte messageType; private byte codec; private byte compress; private int requestId; private Object data; }
|
RpcMessage中的data就是上面的RpcRequest和RpcResponse,接下来我们详细讲解一下自定义的协议
- magic code 魔数:判断是否是遵循同一协议的数据,用来校验数据包有效性,占4byte
- version 版本信息:后续可以用于协议的版本迭代,占1byte
- full length 消息长度:请求头+请求体的总长度,占4byte
- messageType 消息类型:消息可分为请求和相应两类,占1byte
- compress 压缩类型:数据的压缩类型,占1byte
- codec 序列化类型:RpcRequest和RpcResponse的序列化类型,占1byte
- requestId 请求Id:请求Id,用于后续的消息跟踪,占4byte
- body 请求体:RpcRequest或RpcResponse序列化后的byte数据
3)编解码器
RpcMessageEncoder,自定义编码器,负责处理“出站”消息,将消息转换为字节数组然后写入到 ByteBuf 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @Slf4j public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> { private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
@Override protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) { try { out.writeBytes(RpcConstants.MAGIC_NUMBER); out.writeByte(RpcConstants.VERSION); out.writerIndex(out.writerIndex() + 4); byte messageType = rpcMessage.getMessageType(); out.writeByte(messageType); out.writeByte(rpcMessage.getCodec()); out.writeByte(CompressTypeEnum.GZIP.getCode()); out.writeInt(ATOMIC_INTEGER.getAndIncrement()); byte[] bodyBytes = null; int fullLength = RpcConstants.HEAD_LENGTH; if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE && messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) { String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec()); Serializer serializer = new HessianSerializer();
bodyBytes = serializer.serialize(rpcMessage.getData());
Compress compress = new GzipCompress(); bodyBytes = compress.compress(bodyBytes); fullLength += bodyBytes.length; }
if (bodyBytes != null) { out.writeBytes(bodyBytes); } int writeIndex = out.writerIndex(); out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1); out.writeInt(fullLength); out.writerIndex(writeIndex); } catch (Exception e) { log.error("Encode request error!", e); }
} }
|
RpcMessageDecoder,自定义解码器,负责“入站”数据,将 ByteBuf 中的字节数组转换为对应的消息数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| @Slf4j public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder { public RpcMessageDecoder() { this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0); }
public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); }
@Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object decoded = super.decode(ctx, in); if (decoded instanceof ByteBuf) { ByteBuf frame = (ByteBuf) decoded; if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) { try { return decodeFrame(frame); } catch (Exception e) { log.error("Decode frame error!", e); throw e; } finally { frame.release(); } }
} return decoded; }
private Object decodeFrame(ByteBuf in) { checkMagicNumber(in); checkVersion(in); int fullLength = in.readInt(); byte messageType = in.readByte(); byte codecType = in.readByte(); byte compressType = in.readByte(); int requestId = in.readInt(); RpcMessage rpcMessage = RpcMessage.builder() .codec(codecType) .requestId(requestId) .messageType(messageType).build(); if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) { rpcMessage.setData(RpcConstants.PING); return rpcMessage; } if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) { rpcMessage.setData(RpcConstants.PONG); return rpcMessage; } int bodyLength = fullLength - RpcConstants.HEAD_LENGTH; if (bodyLength > 0) { byte[] bs = new byte[bodyLength]; in.readBytes(bs); String compressName = CompressTypeEnum.getName(compressType); Compress compress = new GzipCompress();
bs = compress.decompress(bs); String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec()); Serializer serializer = new HessianSerializer();
if (messageType == RpcConstants.REQUEST_TYPE) { RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class); rpcMessage.setData(tmpValue); } else { RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class); rpcMessage.setData(tmpValue); } } return rpcMessage;
}
private void checkVersion(ByteBuf in) { byte version = in.readByte(); if (version != RpcConstants.VERSION) { throw new RuntimeException("version isn't compatible" + version); } }
private void checkMagicNumber(ByteBuf in) { int len = RpcConstants.MAGIC_NUMBER.length; byte[] tmp = new byte[len]; in.readBytes(tmp); for (int i = 0; i < len; i++) { if (tmp[i] != RpcConstants.MAGIC_NUMBER[i]) { throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp)); } } } }
|
3、Netty
1)客户端
Netty客户端主要提供了以下方法:
- initClientApplication() 用于初始化客户端
- doSubscribeService(Class serviceBean) 服务订阅,将标注了@Reference的属性缓存到本地,之后统一建立Channel
- doConnectServer() 让客户端与@Reference对应的服务端建立Channel
- sendRpcRequest() 发送RpcRequest数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| @Slf4j public class NettyRpcClient implements RpcRequestTransport { private ServiceDiscovery serviceDiscovery; private UnprocessedRequests unprocessedRequests; private ChannelProvider channelProvider; private Bootstrap bootstrap; private EventLoopGroup eventLoopGroup; private ClientConfig clientConfig; private ServiceRegistry serviceRegistry;
public Bootstrap getBootstrap() { return bootstrap; }
public void initClientApplication(){ bootstrap = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(); bootstrap.group(worker); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new RpcMessageEncoder()); ch.pipeline().addLast(new RpcMessageDecoder()); ch.pipeline().addLast(new NettyRpcClientHandler()); } }); this.serviceDiscovery = new ZkServiceDiscoveryImpl();
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class); this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class); this.clientConfig = PropertiesBootstrap.loadClientConfigFromLocal(); serviceRegistry = new ZkServiceRegistryImpl(); CLIENT_CONFIG = clientConfig; }
public void doSubscribeService(Class serviceBean) throws UnknownHostException { URL url = new URL(); url.setServiceName(serviceBean.getName()); url.setApplicationName(clientConfig.getApplicationName()); url.addParameter("host", InetAddress.getLocalHost().getHostAddress()); serviceRegistry.subscribe(url); }
public void doConnectServer(){ for (URL providerURL : SUBSCRIBE_SERVICE_LIST) { List<String> providerIps = serviceDiscovery.lookupService(providerURL.getServiceName());
for (String providerIp : providerIps) { try { ConnectionHandler.connect(providerURL.getServiceName(), providerIp); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
@Override public Object sendRpcRequest(RpcRequest rpcRequest) { CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); Channel channel = getChannel(rpcRequest.getInterfaceName());
if(channel.isActive()){ unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture); RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest) .codec(SerializationTypeEnum.KYRO.getCode()) .compress(CompressTypeEnum.GZIP.getCode()) .messageType(RpcConstants.REQUEST_TYPE).build(); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (future.isSuccess()){ log.info("客户端发送消息: [{}]", rpcMessage); }else { future.channel().close(); resultFuture.completeExceptionally(future.cause()); log.error("发送消息时发生错误: ", future.cause()); } }); } else { throw new IllegalStateException(); }
return resultFuture; }
public Channel getChannel(String interfaceName){ Channel channel = channelProvider.get(interfaceName); return channel; }
public void close(){ eventLoopGroup.shutdownGracefully(); } }
|
2)服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| @Component @Slf4j @Setter public class NettyRpcServer { private ServerConfig serverConfig;
private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
public void registerService(RpcServiceConfig rpcServiceConfig) { serviceProvider.publishService(rpcServiceConfig); }
public void start(){ CustomShutdownHook.getCustomShutdownHook().clearAll(); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup( Runtime.getRuntime().availableProcessors() * 2, ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false) ); try { String host = InetAddress.getLocalHost().getHostAddress(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline();
p.addLast(new RpcMessageEncoder()); p.addLast(new RpcMessageDecoder()); p.addLast(serviceHandlerGroup, new NettyRpcServerHandler()); } }); bootstrap.bind(host, serverConfig.getServerPort()).sync(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (InterruptedException e) { log.info("启动NettyRpcServer服务时发生错误: ", e); } finally { log.info("断开NettyRpcServer服务");
} }
public void exposeService(RpcServiceConfig rpcServiceConfig) { registerService(rpcServiceConfig); }
public void initServerConfig() { ServerConfig serverConfig = PropertiesBootstrap.loadServerConfigFromLocal(); this.setServerConfig(serverConfig); SERVER_CONFIG = serverConfig; } }
|