RocketMQ梳理 - broker
RocketMQ梳理 - broker
一、前言
上一篇文章简单的过了一遍namesrv的流程,其中很多的细节都没有进行展开,整篇文章抓不住重点,权当流程速览了。在这篇文章中将会尝试对rocketmq的流程进行简单的分析。
二、源码分析
1、启动流程分析
下图就是broker的启动类,在namesrv中,有着一个NamesrvController
,负责管理Name Server节点的状态、消息的路由注册与查询等功能,在broker中也有类似的BrokerController,负责管理broker的核心逻辑和状态。在brokerController
中,包含了对消息存储、消息发送和接收、消息队列管理等方面的处理逻辑。
broker的启动流程跟namesrv差不多,可以大致分为 initialize、start,我们先来看initialize初始化。
1)BrokerController initialize
initialize可以分为两部分,初始化broker的元数据和broker中的消息存储的一些配置(比如commitLog、storePath等)。
可以看到,第一步初始化元数据,就是从文件中加载配置信息,此处还分了topicConfigManager
、topicQueueMappingManager
、consumerOffsetManager
等,主要做了职责细分,比如topicConfigManager.load(),topicConfigManager 会从磁盘上的存储文件(通常是${ROCKETMQ_HOME}/store/config/topics.json
)中读取Topic的配置信息。这些配置信息包括Topic的名称、队列数、读写权限等。一旦加载完成,TopicConfigManager
会将这些配置信息存储在内存中,以便快速访问和查询。
接下来我们看initializeMessageStore,DefaultMessageStore
是Broker的核心组件之一,负责管理消息的存储和读取。initializeMessageStore()
方法用于初始化消息存储相关的组件和数据结构,确保消息存储的正常运行。
其内部主要进行这几个工作,初始化消息钩子、加载存储插件等,主要就是初始化messageStore
这个服务。
这些组件都创建完后就进入到存储组件配置的加载、消息加载等步骤。
2)brokerController start
当调用 brokerController.start()
时,会执行一系列的初始化操作和启动流程,包括但不限于:
- 加载配置文件:读取Broker的配置文件,包括Broker的角色、监听端口、存储路径等配置信息。
- 初始化存储服务:根据配置信息初始化消息存储服务,包括创建存储目录、加载存储索引等操作。
- 注册Broker:将当前Broker注册到NameServer,以便Client端可以发现和连接到该Broker。
- 启动网络服务:启动监听端口,接收来自Producer和Consumer的请求。
- 启动定时任务:启动定时任务服务,包括消息延时投递、定时消息等功能。
- 启动消息消费服务:启动消息消费线程池,负责处理消息的消费和投递。
- 启动消息拉取服务:启动消息拉取线程池,负责从存储中拉取消息给Consumer消费。
此处就先启动一系列的基础服务,如消息存储,定时存储、文件监听、心跳处理、路由管理等等。
之后启动一系列定时任务,如心跳维护、元数据刷新等。
2、消息存储设计
RocketMQ是一个高性能的消息中间件,其在消息写入时追求极致的效率,采用的策略是顺序写入,所有的消息都写入到commitLog
文件中,再在文件内部去维护消息的详细信息,如topic、msgBody等等,这就区别于Kafka
,kafka在topic之下还存在分区(Partition)
,消息的写入会随着partition的增多而变得分散,这里便不再对其详细展开。除了CommitLog
,rocketmq的消息存储系统还包括了ConsumeQueue
、IndexFile
和Config
,下面我们逐一展开。
1)CommitLog
CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容
。消息存放的物理文件,每台broker上的commitLog被本机所有的queue共享,不做任何区分。
2)ConsumeQueue
consumeQueue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commitLog上的位置。
其中包含了这个MessageQueue在CommitLog中的起始物理位置偏移量offset,消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件
。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件。
3)IndexFile
IndexFile:
用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容
。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引。
4)Config
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json
: topic 配置属性。subscriptionGroup.json
:消息消费组配置信息。delayOffset.json
:延时消息队列拉取进度。consumerOffset.json
:集群消费模式消息消进度。consumerFilter.json
:主题消息过滤信息。
5)消息存储设计小结
这些数据前面我们也隐晦的谈到过,比如config里面文件的加载就是发生在broker初始化的时候。
而对commitLog和ConsumeQueue的加载也可以从源码中看到。
值得注意的是,RocketMQ在对commitLog、IndexFile等文件进行数据读写的时候,除了用到了读写锁
外 ,还用到了零拷贝技术(MMAP)
。具体到代码里面就是利用JDK里面NIO的MapperByteBuffer的map()函数,来先将磁盘文件(CommitLog文件、consumeQueue文件等)映射到内存里来。
假如没有使用mmap技术的时候,使用最传统和基本普通文件进行io操作会产生数据多拷贝问题。比如从磁盘上把数据读取到内核IO缓冲区里面,然后再从内核IO缓冲区中读取到用户进程私有空间里去,然后我们才能拿到这个数据,如下图(以下两张图均来自小林coding)。
这里的DMA,指的是直接内存访问(Direct Memory Access)
技术,简单理解就是,在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,而 CPU 不再参与任何与数据搬运相关的事情,这样 CPU 就可以去处理别的事务。
MMAP内存映射是在硬盘上文件的位置和应用程序缓冲区(application buffers)
进行映射(建立一种对应关系),当应用进程调用了mmap()
后,DMA 会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系统内核共享这个缓冲区。由于mmap()将文件直接映射到用户空间,所以实际文件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。之后的写文件,应用进程再调用 write()
,操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,从这可以看到,实际上只用到了一次CPU拷贝。
在RocketMQ中,我们可以在DefaultMappedFile类中的init方法中看到其应用:
3、消息写入流程
RocketMQ使用Netty处理网络,broker收到消息写入的请求就会进入SendMessageProcessor
类中processRequest方法。
我们进入sendMessage中,就可以看到其逻辑,最终调用putMessage方法进行消息处理逻辑,当然如果开启了异步发送,就调用相应的异步处理逻辑。
我们再跟下去,就可以发现其最终是调用commitLog的写入逻辑,将消息存储到本地文件commitLog中。