RocketMQ梳理 - consumer
RocketMQ - Consumer
RocketMQ Consumer底层原理分析
消费者模型
在 RocketMQ 的官方文档中,定义了在 Rocket MQ 的领域模型中,消费者的位置和流程。
- 消息由生产者初始化并发送到 RocketMQ 服务端。
- 消息按照到达 RocketMQ 服务端的顺序存储到主题的指定队列中。
- 消费者按照指定的订阅关系从 RocketMQ 服务端中获取消息并消费。

内部属性
消费者分组名称
当前消费者关联的消费者分组名称,消费者必须关联到指定的消费者分组,通过消费者分组获取消费行为。
客户端ID
消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。
预绑定订阅关系列表
指定消费者的订阅关系列表。RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。
消费监听器
RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。
消费者启动流程
要具体地了解 RocketMQ的消费者逻辑,我们先从消费流程开始看起,这里我直接引用了 rocketmq 源码中的消费消息测试接口,这是 Push 模式的,RocketMQ 还有 Pull 模式的消费方式,Pull 模式其实就是”手动“,可以自己选择需要从哪个 MessageQueue 拉取消息,从什么 Offset 开始消费等等,两种方式主线逻辑都是差不多的,因为 RocketMQ 在底层,不论是 Pull 还是 Push,都是采用的 Pull 的方式从 Broker 拉取数据。这里就以 Push 模式来看。
| 1 | 
 | 
直接看启动方法,内部是调用的 defaultMQPushConsumerImpl.start,其大致流程可以分为:
- 配置检查:检查配置信息合法性,主要是消费组名称、消费模式等
- 订阅信息拷贝:将消费者的订阅信息[{topic, subExpression}, {}, {}…]拷贝到 rebalanceImpl 的Map中
- pullAPIWrapper 初始化:内部封装了从 Broker 拉取消息的 API
- offsetStore 初始化:消费模式不同,其消费进度的持久化方式也不同
- 初始化监听器:初始化特定的监听器,如 MessageListenerOrderly 顺序消费、MessageListenerConcurrently 并发消费
- MQClientInstance 的初始化与启动


消费者模式
1、广播模式
生产者发送的消息,会被每个消费者都消费一次;比如生产者发送了m条消息,有n个消费者,那么n个消费者都会消费这m条消息,一共消费了mn次。
在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 LocalFileOffsetStore 进行持久化,其会将把当前消费者的消费进度持久化到本地,而不由集群对其进行管理。
2、集群模式
生产者发送一条消息,会被这个集群下的消费者轮流消费,比如生产者发送了m条消息,有n个消费者,那么n个消费者一共消费了m条消息。
在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 RemoteBrokerOffsetStore 进行持久化,其会将把当前消费者的消费进度持久化到 Broker 中,这个跟 RocketMQ 的负载均衡机制有关,RocketMQ的负载均衡只针对于Consumer集群消费的模式。如下图所示,因为 Broker 集群中的消息是所有消费者一起消费,所以进度是在 Broker 端进行维护的。

消费流程
这里我们先看 RocketMQ 的核心流程,围绕这个我们再去看代码。

并发消费
我们进入到 DefaultMQPushConsumerImpl 的 pullMessage 方法中,就可以看到其拉取消息的逻辑,如在初始化时看到的,这里使用了 pullAPIWrapper.pullKernelImpl() 方法,我们先来看比较重要的参数:
- mq:很显然,要消费队列中的消息,得先知道当前消费者消费的信息,或者说已经消费的信息
- subExpression:订阅规则,存在 rebalanceImpl 的 Map<topic, subExpression> 中,那我们得先有 topic
- offset:这个我们猜应该是下次要消费的点位
- commitOffset:这个应该是 Broker 中的 commitLog 文件中的信息
综上所述,我们至少需要五个信息:
- 队列信息 MessageQueue
- Topic 信息
- subExpression 订阅规则
- offset 要消费的点位
- commitOffset 文件中的点位


1、获取 Topic 信息和 MessageQueue 信息
topic 信息在讲 producer 的时候说过一些,就是一个定时任务去刷新路由信息。


在定时刷新 topic 路由信息的时候,还会刷新其队列信息,在内部的 updateTopicRouteInfoFromNameServer 方法中,后面很多地方都是从这里获取的 MessageQueue。

2、获取消费信息和消费点位
回到我们在集群模式时提到的负载均衡,消费者基于 RocketMQ 的负载均衡策略,从 Broker 中拉取消费信息,其中包括了已消费的信息和消费点位信息,而负载均衡的服务就在这里被启动。

这是个线程类,我们直接看它的 run 方法,每 realWaitInterval 时间阻塞一次,调用 this.mqClientFactory.doRebalance(),我们继续跟进。

这里有几个主要的方法:
- updateProcessQueueTableInRebalance:RebalanceImpl有个processQueueTable属性,该属性维护了当前客户端正在处理的所有 Queue,以及 Queue 对应的消费进度,updateProcessQueueTableInRebalance用于更新 processQueueTable。
- messageQueueChanged:Rebalance 后如果处理的 Queue 列表发生了变更,则执行相应的动作。对于Push模式,会更新客户端订阅topic的版本号(以当前时间时间戳)并通知broker。
在这里区分广播模式和集群模式,如果是广播模式就直接调用 updateProcessQueueTableInRebalance,如果是集群模式则从 Broker 获取消费者信息,再调用 updateProcessQueueTableInRebalance 方法。


我们去 updateProcessQueueTableInRebalance 方法中可以看到点位的逻辑,顺着计算点位一直跟下去,OffsetStore 就是我们最初说的,根据广播或集群区分点位存储逻辑,顾名思义,一个是本地文件获取,一个是从远程 Broker 获取。


3、消费消息
我们再回到信息消费上,这个走的是 PullMessageService (Runable) 中的逻辑,这里注意到是从 LinkedBlockingQueue<MessageRequest> messageRequestQueue 中获取了 MessageRequest,我们先不关注里面的数据从哪来的,继续往下看。

在这里我们可以看到当抛异常或者其他问题时,就会走 executePullRequestLater 逻辑,这个就是把当前的 MessageRequest 重新放回 messageRequestQueue 中。

如果一切都没问题,就走 pullAPIWrapper.pullKernelImpl 方法,底层就是 MQClientAPIImpl 调用RemotingClient 走Netty逻辑了。


之后如果拉取成功,就会触发 onSuccess 回调,在这里如果出现比如、NO_MATCHED_MSG 等情况,就会触发 executePullRequestLater/executePullRequestImmediately,将改 MessageRequest 重新放回 messageRequestQueue,等待下次调用;如果没有问题,就会触发 consumeMessageService.submitConsumeRequest 方法。


我们走到 submitConsumeRequest 方法中,可以看到逻辑是向 consumeExecutor(ThreadPoolExecutor) 提交了一个 consumeRequest(Runnable)。

最终在 processConsumeResult 方法中调用 offsetStore.updateOffset 方法,这个方法将信息存入 Map 中,为之后的点位更新提供数据。


3、消费点位更新
在消费完消息后,得更新消费点位,这个也是由定时任务控制的,往下看,也是由 OffsetStore 控制的,大概就是从 Map 中获取信息,然后如果是集群模式就走 Netty,广播模式就存在本地,就不继续展开了。


4、整体消费逻辑




