RocketMQ梳理 - producer
RocketMQ - Producer
前言
在之前发过 rocketmq 的 namesrv 和 broker 两个模块的一些理解,但在后面总结下来,更多是自顾自地翻代码,感觉自己抓住一个点就一直往下看,对逻辑主线没有一个很好地梳理,不过人总是慢慢成长的,这篇文章用作记录自己简单学习生产者模块的记录。
消费者模型
在 RocketMQ 的官方文档中,定义了在 Rocket MQ 的领域模型中,消费者的位置和流程。
- 消息由生产者初始化并发送到 RocketMQ 服务端。
- 消息按照到达 RocketMQ 服务端的顺序存储到主题的指定队列中。
- 消费者按照指定的订阅关系从 RocketMQ 服务端中获取消息并消费。
内部属性
消费者分组名称
当前消费者关联的消费者分组名称,消费者必须关联到指定的消费者分组,通过消费者分组获取消费行为。
客户端ID
消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。
预绑定订阅关系列表
指定消费者的订阅关系列表。RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。
消费监听器
RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。
RocketMQ Producer底层原理分析
消息发送流程
要具体地了解 RocketMQ
的生产者逻辑,我们先从发送流程开始看起,这里我直接引用了 rocketmq 源码中的生产消息测试接口:
1 |
|
我把一些跟主流程无关的逻辑给去掉了,从这里我们就可以看到其逻辑大致可以分为:
- 初始化生产者(DefaultMQProducer)
- 设置 namesrv 地址
- 启动生产者
- 发送消息
了解了其逻辑的大概雏形,我们这里主要关注生产者启动,以及后面的消息发送,紧抓住这个脉络,一步步往下看。
MQClientInstance
先是生产者启动的逻辑,走的是 defaultMQProducerImpl.start
这个方法,先是配置校验,然后初始化 instanceName
,这里的 MQClientInstance
(源码中的对象名为mQClientFactory),是底层消息发送的主要模块,主要负责消费者、生产者
和 NameServer、Broker
之间的通信,但真正进行 RPC 通信的也不是它。其内部封装了 mQClientAPIImpl
,然后 mQClientAPIImpl 的内部最终通过 remotingClient
进行 RPC 通信。
可以看到获取 MQClientInstance 的逻辑,就是从 Map 中获取,没有的话再创建,我们主要看 MQClientInstance是怎么进行消息通信的。
其内部定义了很多方法,从这里我们就可以知道 MQClientInstance 是怎么负责消费者、生产者和NameServer、Broker之间通信的了。顾名思义,sendHeartbeatToAllBroker
、updateTopicRouteInfoFromNameServer
…
我们进到 sendHeartbeatToAllBroker 方法中,就可以看到其通信的方法,改方法在 mQClientAPIImpl 中定义,再往下走就可以看到 Netty 的逻辑了…
1 | this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout()) |
我们在看这个 MQClientAPIImpl
,还是顾名思义,sendMessage
、sendMessageSync
…
我们回过头来再看看 MQClientInstance
,RocketMQ 中,生产者和消费者都是 “客户端”,每一个客户端就是一个 MQClientInstance,每一个 ClientConfig
对应一个实例。
1 | public class MQClientInstance { |
现在我们在回到初始阶段,初始化完 MQClientInstance,就是启动 MQClientInstance 了。
这里看一下定时任务,定时拉取服务信息、心跳维护、持久化消费进度的逻辑都在这里。这里的服务信息就是路由信息,发送消息的时候要根据 topic 获取路由。
Producer消息发送流程
我们随便从一个发送消息的案例中一直走,就可以找到负责消息发送的实现类,DefaultMQProducerImpl
。
我们直接看默认的实现方法 sendDefaultImpl
,sendDefaultImpl 发送消息的逻辑大致分为:消息校验、路由查找、队列选择、消息发送。
- 消息校验:主要负责校验消息合法性,如 body 长度、topic 名称规范
- 路由查找:从
topicPublishInfoTable
中获取(定时任务刷新),如果不存在则看是否开启了自动创建 topic - 队列选择:获取 broker 中的
MessageQueue
(定时任务刷新),queue 存在TopicPublishInfo
中的messageQueueList
中,默认策略是轮训选择
。还有另一种是故障延迟策略
。如果选择的队列所属的 broker 不可用,则选择下一个,这个策略主要是维护了每次向 broker 发送消息时的成功与否,记录其可用时长和不可用时长 - 消息发送:最终走的逻辑是
MQClientAPIImpl
中的sendMessage
,底层是 Netty
总结下来,RocketMQ Producer消息发送的基础逻辑大概如下,但这是省略了很多细节的情况下的流程图,通过从 NameSrv
获取路由信息,发送 Message 到 Broker
的 MessageQueue
中,而 Broker 接收到 Producer 的 Netty 请求后,就会触发其消息存储逻辑
,将消息写入到 CommitLog
文件中。