RocketMQ - Consumer

RocketMQ Consumer底层原理分析

消费者模型

在 RocketMQ 的官方文档中,定义了在 Rocket MQ 的领域模型中,消费者的位置和流程。

  1. 消息由生产者初始化并发送到 RocketMQ 服务端
  2. 消息按照到达 RocketMQ 服务端的顺序存储到主题的指定队列中。
  3. 消费者按照指定的订阅关系从 RocketMQ 服务端中获取消息并消费。

image-20240103204849483

内部属性

消费者分组名称

当前消费者关联的消费者分组名称,消费者必须关联到指定的消费者分组,通过消费者分组获取消费行为。

客户端ID

消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。

预绑定订阅关系列表

指定消费者的订阅关系列表。RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。

消费监听器

RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。

消费者启动流程

要具体地了解 RocketMQ的消费者逻辑,我们先从消费流程开始看起,这里我直接引用了 rocketmq 源码中的消费消息测试接口,这是 Push 模式的,RocketMQ 还有 Pull 模式的消费方式,Pull 模式其实就是”手动“,可以自己选择需要从哪个 MessageQueue 拉取消息,从什么 Offset 开始消费等等,两种方式主线逻辑都是差不多的,因为 RocketMQ 在底层,不论是 Pull 还是 Push,都是采用的 Pull 的方式从 Broker 拉取数据。这里就以 Push 模式来看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultMQPushConsumerTest {
private static DefaultMQPushConsumer pushConsumer;

@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
//消费组名
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
//namesrv addr
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.setClientRebalance(false);
//回调函数
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String tags = msg.getTags();
String body = Arrays.toString(msg.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//消费模式 -- 广播/集群
pushConsumer.setMessageModel(MessageModel.BROADCASTING);
/*
订阅topic,订阅信息{topic,subExpression}
会存在 RebalanceImpl 中的Map成员属性中

*/
pushConsumer.subscribe(topic, "*");
//启动消费者
pushConsumer.start();
}
}

直接看启动方法,内部是调用的 defaultMQPushConsumerImpl.start,其大致流程可以分为:

  1. 配置检查:检查配置信息合法性,主要是消费组名称、消费模式等
  2. 订阅信息拷贝:将消费者的订阅信息[{topic, subExpression}, {}, {}…]拷贝到 rebalanceImpl 的Map中
  3. pullAPIWrapper 初始化:内部封装了从 Broker 拉取消息的 API
  4. offsetStore 初始化:消费模式不同,其消费进度的持久化方式也不同
  5. 初始化监听器:初始化特定的监听器,如 MessageListenerOrderly 顺序消费、MessageListenerConcurrently 并发消费
  6. MQClientInstance 的初始化与启动

image-20240103204849483

image-20240103210027413

消费者模式

1、广播模式

生产者发送的消息,会被每个消费者都消费一次;比如生产者发送了m条消息,有n个消费者,那么n个消费者都会消费这m条消息,一共消费了mn次。

在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 LocalFileOffsetStore 进行持久化,其会将把当前消费者的消费进度持久化到本地,而不由集群对其进行管理。

2、集群模式

生产者发送一条消息,会被这个集群下的消费者轮流消费,比如生产者发送了m条消息,有n个消费者,那么n个消费者一共消费了m条消息。

在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 RemoteBrokerOffsetStore 进行持久化,其会将把当前消费者的消费进度持久化到 Broker 中,这个跟 RocketMQ 的负载均衡机制有关,RocketMQ的负载均衡只针对于Consumer集群消费的模式。如下图所示,因为 Broker 集群中的消息是所有消费者一起消费,所以进度是在 Broker 端进行维护的。

image-20240103213736641

消费流程

这里我们先看 RocketMQ 的核心流程,围绕这个我们再去看代码。

image-20240103215844004

并发消费

我们进入到 DefaultMQPushConsumerImplpullMessage 方法中,就可以看到其拉取消息的逻辑,如在初始化时看到的,这里使用了 pullAPIWrapper.pullKernelImpl() 方法,我们先来看比较重要的参数:

  1. mq:很显然,要消费队列中的消息,得先知道当前消费者消费的信息,或者说已经消费的信息
  2. subExpression:订阅规则,存在 rebalanceImpl 的 Map<topic, subExpression> 中,那我们得先有 topic
  3. offset:这个我们猜应该是下次要消费的点位
  4. commitOffset:这个应该是 Broker 中的 commitLog 文件中的信息

综上所述,我们至少需要五个信息:

  1. 队列信息 MessageQueue
  2. Topic 信息
  3. subExpression 订阅规则
  4. offset 要消费的点位
  5. commitOffset 文件中的点位

image-20240103220350344

image-20240103220910234

1、获取 Topic 信息和 MessageQueue 信息

topic 信息在讲 producer 的时候说过一些,就是一个定时任务去刷新路由信息

image-20240103224459838

image-20240103223259595

在定时刷新 topic 路由信息的时候,还会刷新其队列信息,在内部的 updateTopicRouteInfoFromNameServer 方法中,后面很多地方都是从这里获取的 MessageQueue

image-20240103225828224

2、获取消费信息和消费点位

回到我们在集群模式时提到的负载均衡,消费者基于 RocketMQ 的负载均衡策略,从 Broker 中拉取消费信息,其中包括了已消费的信息和消费点位信息,而负载均衡的服务就在这里被启动。

image-20240103223036764

这是个线程类,我们直接看它的 run 方法,每 realWaitInterval 时间阻塞一次,调用 this.mqClientFactory.doRebalance(),我们继续跟进。

image-20240103223259595

这里有几个主要的方法:

  1. updateProcessQueueTableInRebalanceRebalanceImpl 有个 processQueueTable 属性,该属性维护了当前客户端正在处理的所有 Queue,以及 Queue 对应的消费进度,updateProcessQueueTableInRebalance 用于更新 processQueueTable
  2. messageQueueChanged:Rebalance 后如果处理的 Queue 列表发生了变更,则执行相应的动作。对于Push模式,会更新客户端订阅topic的版本号(以当前时间时间戳)并通知broker

在这里区分广播模式集群模式,如果是广播模式就直接调用 updateProcessQueueTableInRebalance,如果是集群模式则从 Broker 获取消费者信息,再调用 updateProcessQueueTableInRebalance 方法。

image-20240103232103094

image-20240103232747126

我们去 updateProcessQueueTableInRebalance 方法中可以看到点位的逻辑,顺着计算点位一直跟下去,OffsetStore 就是我们最初说的,根据广播或集群区分点位存储逻辑,顾名思义,一个是本地文件获取,一个是从远程 Broker 获取。

image-20240103233748052

image-20240103234028880

3、消费消息

我们再回到信息消费上,这个走的是 PullMessageService (Runable) 中的逻辑,这里注意到是从 LinkedBlockingQueue<MessageRequest> messageRequestQueue 中获取了 MessageRequest,我们先不关注里面的数据从哪来的,继续往下看。

image-20240104110339570

在这里我们可以看到当抛异常或者其他问题时,就会走 executePullRequestLater 逻辑,这个就是把当前的 MessageRequest 重新放回 messageRequestQueue 中。

image-20240104110709672

如果一切都没问题,就走 pullAPIWrapper.pullKernelImpl 方法,底层就是 MQClientAPIImpl 调用RemotingClient 走Netty逻辑了。

image-20240104110913663

image-20240104114213026

之后如果拉取成功,就会触发 onSuccess 回调,在这里如果出现比如NO_MATCHED_MSG 等情况,就会触发 executePullRequestLater/executePullRequestImmediately,将改 MessageRequest 重新放回 messageRequestQueue,等待下次调用;如果没有问题,就会触发 consumeMessageService.submitConsumeRequest 方法。

image-20240104114358694

image-20240104114556472

我们走到 submitConsumeRequest 方法中,可以看到逻辑是向 consumeExecutor(ThreadPoolExecutor) 提交了一个 consumeRequest(Runnable)

image-20240104114956023

最终在 processConsumeResult 方法中调用 offsetStore.updateOffset 方法,这个方法将信息存入 Map 中,为之后的点位更新提供数据。

image-20240104115251904

image-20240104115504358

3、消费点位更新

在消费完消息后,得更新消费点位,这个也是由定时任务控制的,往下看,也是由 OffsetStore 控制的,大概就是从 Map 中获取信息,然后如果是集群模式就走 Netty,广播模式就存在本地,就不继续展开了。

image-20240103223259595

image-20240103234731467

4、整体消费逻辑

img