RocketMQ梳理 - consumer
RocketMQ - Consumer
RocketMQ Consumer底层原理分析
消费者模型
在 RocketMQ 的官方文档中,定义了在 Rocket MQ 的领域模型中,消费者的位置和流程。
- 消息由生产者初始化并发送到
RocketMQ 服务端
。 - 消息按照到达 RocketMQ 服务端的
顺序存储
到主题的指定队列中。 - 消费者按照指定的
订阅关系
从 RocketMQ 服务端中获取消息并消费。
内部属性
消费者分组名称
当前消费者关联的消费者分组名称,消费者必须关联到指定的消费者分组,通过消费者分组获取消费行为。
客户端ID
消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。
预绑定订阅关系列表
指定消费者的订阅关系列表。RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。
消费监听器
RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。
消费者启动流程
要具体地了解 RocketMQ
的消费者逻辑,我们先从消费流程开始看起,这里我直接引用了 rocketmq 源码中的消费消息测试接口,这是 Push 模式
的,RocketMQ 还有 Pull 模式
的消费方式,Pull 模式其实就是”手动“,可以自己选择需要从哪个 MessageQueue 拉取消息,从什么 Offset 开始消费等等,两种方式主线逻辑都是差不多的,因为 RocketMQ 在底层,不论是 Pull 还是 Push,都是采用的 Pull 的方式从 Broker 拉取数据。这里就以 Push 模式来看。
1 |
|
直接看启动方法,内部是调用的 defaultMQPushConsumerImpl.start,其大致流程可以分为:
- 配置检查:检查配置信息合法性,主要是消费组名称、消费模式等
- 订阅信息拷贝:将消费者的订阅信息[{topic, subExpression}, {}, {}…]拷贝到 rebalanceImpl 的Map中
- pullAPIWrapper 初始化:内部封装了从 Broker 拉取消息的 API
- offsetStore 初始化:消费模式不同,其消费进度的持久化方式也不同
- 初始化监听器:初始化特定的监听器,如 MessageListenerOrderly 顺序消费、MessageListenerConcurrently 并发消费
- MQClientInstance 的初始化与启动
消费者模式
1、广播模式
生产者发送的消息,会被每个消费者都消费一次;比如生产者发送了m条消息,有n个消费者,那么n个消费者都会消费这m条消息,一共消费了mn次。
在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 LocalFileOffsetStore
进行持久化,其会将把当前消费者的消费进度持久化到本地,而不由集群对其进行管理。
2、集群模式
生产者发送一条消息,会被这个集群下的消费者轮流消费,比如生产者发送了m条消息,有n个消费者,那么n个消费者一共消费了m条消息。
在消费者初始化时,定义了广播模式时使用的 offset 持久化方式,使用 RemoteBrokerOffsetStore
进行持久化,其会将把当前消费者的消费进度持久化到 Broker 中,这个跟 RocketMQ 的负载均衡机制有关,RocketMQ的负载均衡只针对于Consumer集群消费的模式。如下图所示,因为 Broker 集群中的消息是所有消费者一起消费,所以进度是在 Broker 端进行维护的。
消费流程
这里我们先看 RocketMQ 的核心流程,围绕这个我们再去看代码。
并发消费
我们进入到 DefaultMQPushConsumerImpl
的 pullMessage
方法中,就可以看到其拉取消息的逻辑,如在初始化时看到的,这里使用了 pullAPIWrapper.pullKernelImpl()
方法,我们先来看比较重要的参数:
- mq:很显然,要消费队列中的消息,得先知道当前消费者消费的信息,或者说已经消费的信息
- subExpression:订阅规则,存在 rebalanceImpl 的 Map<topic, subExpression> 中,那我们得先有 topic
- offset:这个我们猜应该是下次要消费的点位
- commitOffset:这个应该是 Broker 中的 commitLog 文件中的信息
综上所述,我们至少需要五个信息:
- 队列信息 MessageQueue
- Topic 信息
- subExpression 订阅规则
- offset 要消费的点位
- commitOffset 文件中的点位
1、获取 Topic 信息和 MessageQueue 信息
topic 信息在讲 producer 的时候说过一些,就是一个定时任务去刷新路由信息。
在定时刷新 topic 路由信息的时候,还会刷新其队列信息,在内部的 updateTopicRouteInfoFromNameServer
方法中,后面很多地方都是从这里获取的 MessageQueue
。
2、获取消费信息和消费点位
回到我们在集群模式时提到的负载均衡,消费者基于 RocketMQ 的负载均衡策略
,从 Broker 中拉取消费信息,其中包括了已消费的信息和消费点位信息,而负载均衡的服务就在这里被启动。
这是个线程类,我们直接看它的 run 方法,每 realWaitInterval
时间阻塞一次,调用 this.mqClientFactory.doRebalance()
,我们继续跟进。
这里有几个主要的方法:
- updateProcessQueueTableInRebalance:
RebalanceImpl
有个processQueueTable
属性,该属性维护了当前客户端正在处理的所有 Queue,以及 Queue 对应的消费进度,updateProcessQueueTableInRebalance
用于更新 processQueueTable。 - messageQueueChanged:Rebalance 后如果处理的 Queue 列表发生了变更,则执行相应的动作。对于Push模式,会更新客户端订阅topic的版本号(以当前时间时间戳)并通知broker。
在这里区分广播模式
和集群模式
,如果是广播模式就直接调用 updateProcessQueueTableInRebalance
,如果是集群模式则从 Broker 获取消费者信息,再调用 updateProcessQueueTableInRebalance
方法。
我们去 updateProcessQueueTableInRebalance 方法中可以看到点位的逻辑,顺着计算点位一直跟下去,OffsetStore
就是我们最初说的,根据广播或集群区分点位存储逻辑,顾名思义,一个是本地文件获取,一个是从远程 Broker 获取。
3、消费消息
我们再回到信息消费上,这个走的是 PullMessageService (Runable)
中的逻辑,这里注意到是从 LinkedBlockingQueue<MessageRequest> messageRequestQueue
中获取了 MessageRequest
,我们先不关注里面的数据从哪来的,继续往下看。
在这里我们可以看到当抛异常或者其他问题时,就会走 executePullRequestLater
逻辑,这个就是把当前的 MessageRequest
重新放回 messageRequestQueue
中。
如果一切都没问题,就走 pullAPIWrapper.pullKernelImpl
方法,底层就是 MQClientAPIImpl 调用RemotingClient 走Netty逻辑了。
之后如果拉取成功,就会触发 onSuccess
回调,在这里如果出现比如、NO_MATCHED_MSG 等情况,就会触发 executePullRequestLater/executePullRequestImmediately
,将改 MessageRequest 重新放回 messageRequestQueue,等待下次调用;如果没有问题,就会触发 consumeMessageService.submitConsumeRequest
方法。
我们走到 submitConsumeRequest
方法中,可以看到逻辑是向 consumeExecutor(ThreadPoolExecutor)
提交了一个 consumeRequest(Runnable)
。
最终在 processConsumeResult
方法中调用 offsetStore.updateOffset
方法,这个方法将信息存入 Map 中,为之后的点位更新提供数据。
3、消费点位更新
在消费完消息后,得更新消费点位,这个也是由定时任务控制的,往下看,也是由 OffsetStore 控制的,大概就是从 Map 中获取信息,然后如果是集群模式就走 Netty,广播模式就存在本地,就不继续展开了。