RocketMQ梳理 - broker

一、前言

上一篇文章简单的过了一遍namesrv的流程,其中很多的细节都没有进行展开,整篇文章抓不住重点,权当流程速览了。在这篇文章中将会尝试对rocketmq的流程进行简单的分析。

二、源码分析

1、启动流程分析

下图就是broker的启动类,在namesrv中,有着一个NamesrvController,负责管理Name Server节点的状态消息的路由注册查询等功能,在broker中也有类似的BrokerController,负责管理broker的核心逻辑和状态。在brokerController中,包含了对消息存储、消息发送和接收、消息队列管理等方面的处理逻辑。

image-20231201102505627

broker的启动流程跟namesrv差不多,可以大致分为 initializestart,我们先来看initialize初始化。

1)BrokerController initialize

initialize可以分为两部分,初始化broker的元数据和broker中的消息存储的一些配置(比如commitLog、storePath等)。

image-20231201103338259

可以看到,第一步初始化元数据,就是从文件中加载配置信息,此处还分了topicConfigManagertopicQueueMappingManagerconsumerOffsetManager等,主要做了职责细分,比如topicConfigManager.load(),topicConfigManager 会从磁盘上的存储文件(通常是${ROCKETMQ_HOME}/store/config/topics.json)中读取Topic的配置信息。这些配置信息包括Topic的名称、队列数、读写权限等。一旦加载完成,TopicConfigManager会将这些配置信息存储在内存中,以便快速访问和查询。

image-20231201104132856

image-20231201104330642

接下来我们看initializeMessageStore,DefaultMessageStore是Broker的核心组件之一,负责管理消息的存储和读取。initializeMessageStore()方法用于初始化消息存储相关的组件和数据结构,确保消息存储的正常运行。

其内部主要进行这几个工作,初始化消息钩子加载存储插件等,主要就是初始化messageStore这个服务。

image-20231201104942256

这些组件都创建完后就进入到存储组件配置的加载消息加载等步骤。

image-20231205172720014

image-20231205172741212

2)brokerController start

当调用 brokerController.start()时,会执行一系列的初始化操作和启动流程,包括但不限于:

  1. 加载配置文件:读取Broker的配置文件,包括Broker的角色、监听端口、存储路径等配置信息。
  2. 初始化存储服务:根据配置信息初始化消息存储服务,包括创建存储目录、加载存储索引等操作。
  3. 注册Broker:将当前Broker注册到NameServer,以便Client端可以发现和连接到该Broker。
  4. 启动网络服务:启动监听端口,接收来自Producer和Consumer的请求。
  5. 启动定时任务:启动定时任务服务,包括消息延时投递、定时消息等功能。
  6. 启动消息消费服务:启动消息消费线程池,负责处理消息的消费和投递。
  7. 启动消息拉取服务:启动消息拉取线程池,负责从存储中拉取消息给Consumer消费。

image-20231201110942999

此处就先启动一系列的基础服务,如消息存储定时存储文件监听心跳处理路由管理等等。

image-20231201111635266

之后启动一系列定时任务,如心跳维护元数据刷新等。

image-20231201111954763

image-20231201112011736

image-20231201112020639

2、消息存储设计

RocketMQ是一个高性能的消息中间件,其在消息写入时追求极致的效率,采用的策略是顺序写入,所有的消息都写入到commitLog文件中,再在文件内部去维护消息的详细信息,如topic、msgBody等等,这就区别于Kafka,kafka在topic之下还存在分区(Partition),消息的写入会随着partition的增多而变得分散,这里便不再对其详细展开。除了CommitLog,rocketmq的消息存储系统还包括了ConsumeQueueIndexFileConfig,下面我们逐一展开。

image-20231205170543849

1)CommitLog

CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。消息存放的物理文件,每台broker上的commitLog被本机所有的queue共享,不做任何区分。

image-20231205163633045

image-20231206104637478

2)ConsumeQueue

consumeQueue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commitLog上的位置。其中包含了这个MessageQueue在CommitLog中的起始物理位置偏移量offset,消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件。

image-20231205164252133

image-20231205164315021

image-20231205164358006

image-20231206104451012

3)IndexFile

IndexFile:用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引。

image-20231205165922541

image-20231206104307435

4)Config

config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。

  1. topics.json : topic 配置属性。
  2. subscriptionGroup.json :消息消费组配置信息。
  3. delayOffset.json :延时消息队列拉取进度。
  4. consumerOffset.json :集群消费模式消息消进度。
  5. consumerFilter.json :主题消息过滤信息。

image-20231205170306191

5)消息存储设计小结

这些数据前面我们也隐晦的谈到过,比如config里面文件的加载就是发生在broker初始化的时候。

image-20231205171933084

image-20231205170858242

image-20231205170941739

而对commitLog和ConsumeQueue的加载也可以从源码中看到。

image-20231205172101035

image-20231205172129342

值得注意的是,RocketMQ在对commitLog、IndexFile等文件进行数据读写的时候,除了用到了读写锁外 ,还用到了零拷贝技术(MMAP)。具体到代码里面就是利用JDK里面NIO的MapperByteBuffer的map()函数,来先将磁盘文件(CommitLog文件、consumeQueue文件等)映射到内存里来。

假如没有使用mmap技术的时候,使用最传统和基本普通文件进行io操作会产生数据多拷贝问题。比如从磁盘上把数据读取到内核IO缓冲区里面,然后再从内核IO缓冲区中读取到用户进程私有空间里去,然后我们才能拿到这个数据,如下图(以下两张图均来自小林coding)。

这里的DMA,指的是直接内存访问(Direct Memory Access) 技术,简单理解就是,在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,而 CPU 不再参与任何与数据搬运相关的事情,这样 CPU 就可以去处理别的事务

image-20231206102441861

MMAP内存映射是在硬盘上文件的位置应用程序缓冲区(application buffers)进行映射(建立一种对应关系),当应用进程调用了mmap()后,DMA 会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系统内核共享这个缓冲区。由于mmap()将文件直接映射到用户空间,所以实际文件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。之后的写文件,应用进程再调用 write(),操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,从这可以看到,实际上只用到了一次CPU拷贝。

image-20231206102441861

在RocketMQ中,我们可以在DefaultMappedFile类中的init方法中看到其应用:

image-20231206111036576

3、消息写入流程

RocketMQ使用Netty处理网络,broker收到消息写入的请求就会进入SendMessageProcessor类中processRequest方法。

image-20231206102441861

我们进入sendMessage中,就可以看到其逻辑,最终调用putMessage方法进行消息处理逻辑,当然如果开启了异步发送,就调用相应的异步处理逻辑。

image-20231206102749089

我们再跟下去,就可以发现其最终是调用commitLog的写入逻辑,将消息存储到本地文件commitLog中。

image-20231206103055443