简易RPC框架 - 网络传输模块

1、前言

我们在前面的文章中说到了客户端通过代理对象进行远程调用,其中通过Channel与服务端进行网络通信,其实就是通过网络请求来传递类信息、方法信息以及方法参数等数据到服务端。其中网络传输的具体实现在本项目中我们使用的是基于NIO的网络编程框架Netty。

2、网络传输

1)网络传输实体类

在此我们先定义了一些在网络传输中的数据格式:

RpcRequest请求类,当你要调用远程方法时,需要将你要调用的方法的详细信息传输到服务器端,然后服务端就能根据这些信息去获取方法对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@ToString
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 6672133783386466359L;

private String requestId;
private String interfaceName;
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private String version;
private String group;


public String getRpcServiceName() {
return getInterfaceName() + getGroup() + getVersion();
}
}

RpcResponse响应类,服务端执行完请求后,就可以将响应信息封装成响应类传输给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcResponse<T> implements Serializable {
private static final long serialVersionUID = 6672133783386466359L;

private String requestId;
//response code
private Integer code;
//response message
private String message;
//response body
private T data;

public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCode.SUCCESS.getCode());
response.setMessage(RpcResponseCode.SUCCESS.getMessage());
response.setRequestId(requestId);
if(data != null) {
response.setData(data);
}
return response;
}

public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(rpcResponseCodeEnum.getCode());
response.setMessage(rpcResponseCodeEnum.getMessage());
return response;
}
}

但真正在网络中传输不是仅仅是传输这个对象就行了,还有可能出现很多问题,比如在TCP传输过程中的黏包半包问题。

粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,如下图所示:

uuu

半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包,如下图所示:

yyy

为什么会出现黏包半包问题?粘包问题发生在 TCP/IP 协议中,因为 TCP 是面向连接的传输协议,它是以“流”的形式传输数据的,而“流”数据是没有明确的开始和结尾边界的,所以就会出现粘包问题

2)自定义传输协议

那要什么解决这个问题呢?我们采用自定义传输协议,并对数据进行相应的编码解码以解决这个问题。简单来说,我们通过设计传输协议,定义需要传输的数据以及其需要占多少字节的数据,当我们在收到传输数据后,就可以根据我们设计的传输协议去解析出正确的数据。

首先我们定义真正在网络中进行传输的对象,RpcMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@ToString
public class RpcMessage {
//消息类型
private byte messageType;
//序列化类型
private byte codec;
//压缩类型
private byte compress;
//请求id
private int requestId;
//数据
private Object data;
}

RpcMessage中的data就是上面的RpcRequest和RpcResponse,接下来我们详细讲解一下自定义的协议

aaa

  1. magic code 魔数:判断是否是遵循同一协议的数据,用来校验数据包有效性,占4byte
  2. version 版本信息:后续可以用于协议的版本迭代,占1byte
  3. full length 消息长度:请求头+请求体的总长度,占4byte
  4. messageType 消息类型:消息可分为请求和相应两类,占1byte
  5. compress 压缩类型:数据的压缩类型,占1byte
  6. codec 序列化类型:RpcRequest和RpcResponse的序列化类型,占1byte
  7. requestId 请求Id:请求Id,用于后续的消息跟踪,占4byte
  8. body 请求体:RpcRequest或RpcResponse序列化后的byte数据

3)编解码器

RpcMessageEncoder,自定义编码器,负责处理“出站”消息,将消息转换为字节数组然后写入到 ByteBuf 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Slf4j
public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
// 原子类,保证线程安全
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);

@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
//将RpcMessage对象转换为字节流,写入到ByteBuf中
try {
//写入魔数
out.writeBytes(RpcConstants.MAGIC_NUMBER);
//写入版本号
out.writeByte(RpcConstants.VERSION);
//写入消息长度,先占位,后面再写入
out.writerIndex(out.writerIndex() + 4);
//写入消息类型
byte messageType = rpcMessage.getMessageType();
out.writeByte(messageType);
//写入序列化类型
out.writeByte(rpcMessage.getCodec());
//写入压缩类型
out.writeByte(CompressTypeEnum.GZIP.getCode());
//写入requestId,相当于请求序号,为了全双工通信,提供异步能力
out.writeInt(ATOMIC_INTEGER.getAndIncrement());
//获取消息长度和消息体
byte[] bodyBytes = null;
//初始消息长度为消息头长度
int fullLength = RpcConstants.HEAD_LENGTH;
//如果消息类型不是心跳包, fullLength = head length + body length
if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
//获取序列化类型
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
//TODO 根据序列化类型通过SPI机制获取序列化器
Serializer serializer = new HessianSerializer();
// Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
// .getExtension(codecName);
//序列化消息体
bodyBytes = serializer.serialize(rpcMessage.getData());
//TODO 根据压缩类型通过SPI机制获取压缩器
//获取压缩类型
// String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
// Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
// .getExtension(compressName);
Compress compress = new GzipCompress();
//压缩消息体
bodyBytes = compress.compress(bodyBytes);
//最终消息长度 = 消息头长度 + 消息体长度
fullLength += bodyBytes.length;
}

if (bodyBytes != null) {
//写入消息体
out.writeBytes(bodyBytes);
}
//获取写入索引
int writeIndex = out.writerIndex();
//回到消息长度的占位符位置:当前索引 - 消息长度占位符长度 + 魔数长度 + version长度
out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);
//写入消息长度
out.writeInt(fullLength);
//回到写入索引位置
out.writerIndex(writeIndex);
} catch (Exception e) {
log.error("Encode request error!", e);
}

}
}

RpcMessageDecoder,自定义解码器,负责“入站”数据,将 ByteBuf 中的字节数组转换为对应的消息数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@Slf4j
public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
public RpcMessageDecoder() {
// lengthFieldOffset = 魔数(4B) + 版本(1B) = 5B
// lengthFieldLength = 消息长度 int(4B)
// lengthAdjustment = -9,因为我们的长度域是从魔数开始的,所以我们需要调整长度域的偏移量
// initialBytesToStrip = 0,我们会手动检查魔数和版本,所以不需要跳过任何字节
this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0);
}

/**
* @param maxFrameLength 最大帧长度
* @param lengthFieldOffset 长度域(消息长度)的偏移量,简单而言就是偏移几个字节后才是长度域
* @param lengthFieldLength 长度域的所占的字节数
* @param lengthAdjustment 长度适配适配值。该值表示协议中长度字段与消息体字段直接的距离值,Netty在解码时会根据该值计算消息体的开始位置,默认为0
* @param initialBytesToStrip 最后解析结果中需要剥离的字节数
*/
public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decoded = super.decode(ctx, in);
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) decoded;
if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {
try {
return decodeFrame(frame);
} catch (Exception e) {
log.error("Decode frame error!", e);
throw e;
} finally {
frame.release();
}
}

}
return decoded;
}


private Object decodeFrame(ByteBuf in) {
//按顺序读取魔数、版本、消息长度
//读取魔数并比较
checkMagicNumber(in);
//读取版本并比较
checkVersion(in);
//读取4字节消息长度
int fullLength = in.readInt();
//读取1字节消息类型
byte messageType = in.readByte();
//读取1字节序列化类型
byte codecType = in.readByte();
//读取1字节压缩类型
byte compressType = in.readByte();
//读取4字节请求ID
int requestId = in.readInt();
//构建RpcMessage对象
RpcMessage rpcMessage = RpcMessage.builder()
.codec(codecType)
.requestId(requestId)
.messageType(messageType).build();
//根据消息类型解析消息
//心跳消息
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setData(RpcConstants.PING);
return rpcMessage;
}
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
rpcMessage.setData(RpcConstants.PONG);
return rpcMessage;
}
//普通消息
//读取消息体长度
int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
//读取消息体
in.readBytes(bs);
//解压缩
String compressName = CompressTypeEnum.getName(compressType);
//TODO 根据压缩类型通过SPI机制获取压缩器
Compress compress = new GzipCompress();
// Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
// .getExtension(compressName);
bs = compress.decompress(bs);
//反序列化
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
//TODO 根据序列化类型通过SPI机制获取序列化器
Serializer serializer = new HessianSerializer();
// Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
// .getExtension(codecName);
//根据消息类型反序列化
if (messageType == RpcConstants.REQUEST_TYPE) {
RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
rpcMessage.setData(tmpValue);
} else {
RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);
rpcMessage.setData(tmpValue);
}
}
return rpcMessage;

}

private void checkVersion(ByteBuf in) {
//读取第5个字节,比较版本
byte version = in.readByte();
if (version != RpcConstants.VERSION) {
throw new RuntimeException("version isn't compatible" + version);
}
}

private void checkMagicNumber(ByteBuf in) {
//读取前4个字节,比较魔数
int len = RpcConstants.MAGIC_NUMBER.length;
byte[] tmp = new byte[len];
in.readBytes(tmp);
for (int i = 0; i < len; i++) {
if (tmp[i] != RpcConstants.MAGIC_NUMBER[i]) {
throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp));
}
}
}
}

3、Netty

1)客户端

Netty客户端主要提供了以下方法:

  • initClientApplication() 用于初始化客户端
  • doSubscribeService(Class serviceBean) 服务订阅,将标注了@Reference的属性缓存到本地,之后统一建立Channel
  • doConnectServer() 让客户端与@Reference对应的服务端建立Channel
  • sendRpcRequest() 发送RpcRequest数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@Slf4j
public class NettyRpcClient implements RpcRequestTransport {
private ServiceDiscovery serviceDiscovery;
private UnprocessedRequests unprocessedRequests;
private ChannelProvider channelProvider;
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup;
private ClientConfig clientConfig;
private ServiceRegistry serviceRegistry;

public Bootstrap getBootstrap() {
return bootstrap;
}

public void initClientApplication(){
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();

EventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(worker);
//添加 ChannelHandler 以处理每个 Channel 的日志消息
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new RpcMessageEncoder());
ch.pipeline().addLast(new RpcMessageDecoder());
//添加自定义的ChannelHandler
ch.pipeline().addLast(new NettyRpcClientHandler());
}
});
//TODO 从配置文件中读取服务发现类
this.serviceDiscovery = new ZkServiceDiscoveryImpl();
// this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
this.clientConfig = PropertiesBootstrap.loadClientConfigFromLocal();
serviceRegistry = new ZkServiceRegistryImpl();
CLIENT_CONFIG = clientConfig;
}

/**
* 服务订阅
* @param serviceBean 标注了@RpcReference的属性
*/
public void doSubscribeService(Class serviceBean) throws UnknownHostException {
URL url = new URL();
url.setServiceName(serviceBean.getName());
url.setApplicationName(clientConfig.getApplicationName());
url.addParameter("host", InetAddress.getLocalHost().getHostAddress());
//subscribe:将URL存入SUBSCRIBE_SERVICE_LIST
serviceRegistry.subscribe(url);
}

/**
* 开始与各个provider建立连接,TODO 同时监听各个providerNode节点的变化(child变化和nodeData的变化)
*/
public void doConnectServer(){
//SUBSCRIBE_SERVICE_LIST 为所有标注了@RpcReference的属性的信息集合
for (URL providerURL : SUBSCRIBE_SERVICE_LIST) {
//根据标注了@RpcReference的serviceName去Zookeeper上获取其对应的地址
List<String> providerIps = serviceDiscovery.lookupService(providerURL.getServiceName());
/*
相当于@RpcReference客户端与对应的每一个服务提供者建立连接
@RpcReference
OrderService orderService;
-> OrderApplication:8081
-> OrderApplication:8082
*/
for (String providerIp : providerIps) {
try {
ConnectionHandler.connect(providerURL.getServiceName(), providerIp);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
//根据serviceName获取Channel
Channel channel = getChannel(rpcRequest.getInterfaceName());

if(channel.isActive()){
//将未处理的请求放入map中
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
//构建RpcMessage
RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
.codec(SerializationTypeEnum.KYRO.getCode())
.compress(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE).build();
//将请求发送给服务器
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()){
log.info("客户端发送消息: [{}]", rpcMessage);
}else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("发送消息时发生错误: ", future.cause());
}
});
} else {
throw new IllegalStateException();
}

return resultFuture;
}

public Channel getChannel(String interfaceName){
Channel channel = channelProvider.get(interfaceName);
return channel;
}

public void close(){
eventLoopGroup.shutdownGracefully();
}
}

2)服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Component
@Slf4j
@Setter
public class NettyRpcServer {
private ServerConfig serverConfig;

private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);


public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}

public void start(){
CustomShutdownHook.getCustomShutdownHook().clearAll();
//创建bossGroup和workerGroup
//bossGroup只负责连接请求,workerGroup负责读写请求
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务端启动对象
DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
//cpu核心数*2
Runtime.getRuntime().availableProcessors() * 2,
//创建线程工厂
ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)
);
try {
//获取本机ip
String host = InetAddress.getLocalHost().getHostAddress();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
// .childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ChannelPipeline p = ch.pipeline();
// p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
// p.addLast(new NettyKryoEncoder(new KyroSerializer(), RpcMessage.class));
// p.addLast(new NettyKryoDecoder(new KyroSerializer(), RpcMessage.class));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());
}
});
//绑定端口,启动服务,sync()同步等待绑定成功,然后获取到ChannelFuture
bootstrap.bind(host, serverConfig.getServerPort()).sync();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (InterruptedException e) {
log.info("启动NettyRpcServer服务时发生错误: ", e);
} finally {
//关闭线程组
log.info("断开NettyRpcServer服务");
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();
// serviceHandlerGroup.shutdownGracefully();
}
}

/**
* 暴露服务 -- 如 127.0.0.1:8080 下的UserService、ProductService...
* @param rpcServiceConfig
*/
public void exposeService(RpcServiceConfig rpcServiceConfig) {
registerService(rpcServiceConfig);
}

public void initServerConfig() {
ServerConfig serverConfig = PropertiesBootstrap.loadServerConfigFromLocal();
this.setServerConfig(serverConfig);
SERVER_CONFIG = serverConfig;
}
}