RocketMQ - Producer

前言

在之前发过 rocketmq 的 namesrv 和 broker 两个模块的一些理解,但在后面总结下来,更多是自顾自地翻代码,感觉自己抓住一个点就一直往下看,对逻辑主线没有一个很好地梳理,不过人总是慢慢成长的,这篇文章用作记录自己简单学习生产者模块的记录。

消费者模型

在 RocketMQ 的官方文档中,定义了在 Rocket MQ 的领域模型中,消费者的位置和流程。

  1. 消息由生产者初始化并发送到 RocketMQ 服务端。
  2. 消息按照到达 RocketMQ 服务端的顺序存储到主题的指定队列中。
  3. 消费者按照指定的订阅关系从 RocketMQ 服务端中获取消息并消费。

image-20240103204849483

内部属性

消费者分组名称

当前消费者关联的消费者分组名称,消费者必须关联到指定的消费者分组,通过消费者分组获取消费行为。

客户端ID

消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。

预绑定订阅关系列表

指定消费者的订阅关系列表。RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。

消费监听器

RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。

RocketMQ Producer底层原理分析

消息发送流程

要具体地了解 RocketMQ的生产者逻辑,我们先从发送流程开始看起,这里我直接引用了 rocketmq 源码中的生产消息测试接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerTest {
private DefaultMQProducer producer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";

@Before
public void init() throws Exception {
//生产者组前缀
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
//DefaultMQProducer
producer = new DefaultMQProducer(producerGroupTemp);
//namesrv addr
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
//启动生产者
producer.start();
}

@Test
public void testSendMessageSync_Success() {
SendResult sendResult = producer.send(message);
}
}

我把一些跟主流程无关的逻辑给去掉了,从这里我们就可以看到其逻辑大致可以分为:

  1. 初始化生产者(DefaultMQProducer)
  2. 设置 namesrv 地址
  3. 启动生产者
  4. 发送消息

了解了其逻辑的大概雏形,我们这里主要关注生产者启动,以及后面的消息发送,紧抓住这个脉络,一步步往下看。

MQClientInstance

先是生产者启动的逻辑,走的是 defaultMQProducerImpl.start 这个方法,先是配置校验,然后初始化 instanceName,这里的 MQClientInstance(源码中的对象名为mQClientFactory),是底层消息发送的主要模块,主要负责消费者、生产者NameServer、Broker之间的通信,但真正进行 RPC 通信的也不是它。其内部封装了 mQClientAPIImpl,然后 mQClientAPIImpl 的内部最终通过 remotingClient 进行 RPC 通信。

image-20240103163606973

可以看到获取 MQClientInstance 的逻辑,就是从 Map 中获取,没有的话再创建,我们主要看 MQClientInstance是怎么进行消息通信的。

image-20240103155058989

其内部定义了很多方法,从这里我们就可以知道 MQClientInstance 是怎么负责消费者、生产者和NameServer、Broker之间通信的了。顾名思义,sendHeartbeatToAllBrokerupdateTopicRouteInfoFromNameServer

image-20240103161136330

我们进到 sendHeartbeatToAllBroker 方法中,就可以看到其通信的方法,改方法在 mQClientAPIImpl 中定义,再往下走就可以看到 Netty 的逻辑了…

1
this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout())

我们在看这个 MQClientAPIImpl,还是顾名思义,sendMessagesendMessageSync

image-20240103161814353

我们回过头来再看看 MQClientInstance,RocketMQ 中,生产者和消费者都是 “客户端”,每一个客户端就是一个 MQClientInstance,每一个 ClientConfig 对应一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MQClientInstance {
private final ClientConfig clientConfig;
private final String clientId;
private final ConcurrentMap<String, MQProducerInner> producerTable = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<>();
private final NettyClientConfig nettyClientConfig;
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<>();
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
private final DefaultMQProducer defaultMQProducer;
}

现在我们在回到初始阶段,初始化完 MQClientInstance,就是启动 MQClientInstance 了。

image-20240103163606973

image-20240103164046931

这里看一下定时任务,定时拉取服务信息心跳维护持久化消费进度的逻辑都在这里。这里的服务信息就是路由信息,发送消息的时候要根据 topic 获取路由。

image-20240103164620349

Producer消息发送流程

我们随便从一个发送消息的案例中一直走,就可以找到负责消息发送的实现类,DefaultMQProducerImpl

image-20240103170147363

我们直接看默认的实现方法 sendDefaultImpl,sendDefaultImpl 发送消息的逻辑大致分为:消息校验、路由查找、队列选择、消息发送

  1. 消息校验:主要负责校验消息合法性,如 body 长度、topic 名称规范
  2. 路由查找:从 topicPublishInfoTable 中获取(定时任务刷新),如果不存在则看是否开启了自动创建 topic
  3. 队列选择:获取 broker 中的 MessageQueue(定时任务刷新),queue 存在 TopicPublishInfo 中的 messageQueueList 中,默认策略是轮训选择。还有另一种是故障延迟策略。如果选择的队列所属的 broker 不可用,则选择下一个,这个策略主要是维护了每次向 broker 发送消息时的成功与否,记录其可用时长和不可用时长
  4. 消息发送:最终走的逻辑是 MQClientAPIImpl 中的 sendMessage,底层是 Netty

image-20240103170711913

image-20240103171330218

总结下来,RocketMQ Producer消息发送的基础逻辑大概如下,但这是省略了很多细节的情况下的流程图,通过从 NameSrv 获取路由信息,发送 Message 到 BrokerMessageQueue 中,而 Broker 接收到 Producer 的 Netty 请求后,就会触发其消息存储逻辑,将消息写入到 CommitLog 文件中。

image-20240103175534585