万字长文讲透 RocketMQ 的消费逻辑

RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。,这篇文章,笔者梳理了 RocketMQ 的消费逻辑,希望对大家有所启发。,图片,在展开集群消费逻辑细节前,我们先对 RocketMQ 4.9.X 架构做一个概览。,图片,整体架构中包含四种角色 :,名字服务是是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper ,支持 Broker 的动态注册与发现。,Broker 主要负责消息的存储、投递和查询以及服务高可用保证 。,消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。,消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。,RocketMQ 集群工作流程:,1、启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer 、Consumer 连上来,相当于一个路由控制中心。,2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息( IP+端口等 )以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。,3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。,4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。,5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。,RocketMQ 的传输模型是:发布订阅模型 。,发布订阅模型具有如下特点:,RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。,集群消费:同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。,图片,广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。,图片,为了实现这种发布订阅模型 , RocketMQ 精心设计了它的存储模型。先进入 Broker 的文件存储目录。,图片,RocketMQ 采用的是混合型的存储结构。,1、Broker 单个实例下所有的队列共用一个数据文件(commitlog)来存储,生产者发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 commitlog 文件中。只要消息被刷盘持久化至磁盘文件 commitlog 中,那么生产者发送的消息就不会丢失。,单个文件大小默认 1G , 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0 ,文件大小为1 G = 1073741824 。,图片,commitlog 目录,这种设计有两个优点:,2、Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件),进入索引文件存储目录 :,图片,1、消费文件按照主题存储,每个主题下有不同的队列,图中主题 my-mac-topic 有 16 个队列 (0 到 15) ;,2、每个队列目录下 ,存储 consumequeue 文件,每个 consumequeue 文件也是顺序写入,数据格式见下图。,图片,每个 consumequeue 文件包含 30 万个条目,每个条目大小是 20 个字节,每个文件的大小是 30 万 * 20 = 60万字节,每个文件大小约 5.72M 。,和 commitlog 文件类似,consumequeue 文件的名称也是以偏移量来命名的,可以通过消息的逻辑偏移量定位消息位于哪一个文件里。,消费文件按照主题-队列来保存 ,这种方式特别适配发布订阅模型。,消费者从 Broker 获取订阅消息数据时,不用遍历整个 commitlog 文件,只需要根据逻辑偏移量从 consumequeue 文件查询消息偏移量 ,  最后通过定位到 commitlog 文件, 获取真正的消息数据。,要实现发布订阅模型,还需要一个重要文件:消费进度文件。原因有两点:,因此消费进度文件需要保存消费组所订阅主题的消费进度。,我们浏览下集群消费场景下的 Broker 端的消费进度文件 consumerOffset.json 。,图片,图片,在进度文件 consumerOffset.json 里,数据以 key-value 的结构存储,key 表示:主题@消费者组 , value 是 consumequeue 中每个队列对应的逻辑偏移量 。,写到这里,我们粗糙模拟下 RocketMQ 存储模型如何满足发布订阅模型(集群模式) 。,图片,1、发送消息:生产者发送消息到 Broker ;,2、保存消息:Broker 将消息存储到 commitlog 文件 ,异步线程会构建消费文件 consumequeue ;,3、消费流程:消费者启动后,会通过负载均衡分配对应的队列,然后向 Broker 发送拉取消息请求。Broker 收到消费者拉取请求之后,根据订阅组,消费者编号,主题,队列名,逻辑偏移量等参数 ,从该主题下的 consumequeue 文件查询消息消费条目,然后从 commitlog 文件中获取消息实体。消费者在收到消息数据之后,执行消费监听器,消费完消息;,4、保存进度:消费者将消费进度提交到 Broker ,Broker 会将该消费组的消费进度存储在进度文件里。,我们重点讲解下集群消费的消费流程 ,因为集群消费是使用最普遍的消费模式,理解了集群消费,广播消费也就能顺理成章的掌握了。,图片,集群消费示例代码里,启动消费者,我们需要配置三个核心属性:消费组名、订阅主题、消息监听器,最后调用 start 方法启动。,消费者启动后,我们可以将整个流程简化成:,图片,消费端的负载均衡是指将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者,负载均衡是客户端开始消费的起点。,RocketMQ 负载均衡的核心设计理念是,负载均衡是每个客户端独立进行计算,那么何时触发呢 ?,负载均衡流程如下:,消费者启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(消息消费分组名称、订阅关系集合、消息通信模式和客户端实例编号等信息)。,Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelInfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。,负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:,(1) 获取该主题下的消息消费队列集合;,(2) 查询 Broker 端获取该消费组下消费者 Id 列表;,(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;,这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。,(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作。,消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的处理队列 processQueue (消费快照)。,最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。,在负载均衡这一小节,我们已经知道负载均衡触发了拉取消息的流程。,消费者启动的时候,会创建一个拉取消息服务 PullMessageService ,它是一个单线程的服务。,核心流程如下:,1、负载均衡服务将消息拉取请求放入到拉取请求队列 pullRequestQueue , 拉取消息服务从队列中获取拉取消息请求 ;,2、拉取消息服务向 Brorker 服务发送拉取请求 ,拉取请求的通讯模式是异步回调模式 ;,消费者的拉取消息服务本身就是一个单线程,使用异步回调模式,发送拉取消息请求到 Broker 后,拉取消息线程并不会阻塞 ,可以继续处理队列 pullRequestQueue 中的其他拉取任务。,3、Broker 收到消费者拉取消息请求后,从存储中查询出消息数据,然后返回给消费者;,4、消费者的网络通讯层会执行拉取回调函数相关逻辑,首先会将消息数据存储在队列消费快照 processQueue 里;,消费快照使用红黑树 msgTreeMap 存储拉取服务拉取到的消息 。,5、回调函数将消费请求提交到消息消费服务 ,而消息消费服务会异步的消费这些消息;,6、回调函数会将处理中队列的拉取请放入到定时任务中;,7、定时任务再次将消息拉取请求放入到队列 pullRequestQueue 中,形成了闭环:负载均衡后的队列总会有任务执行拉取消息请求,不会中断。,细心的同学肯定有疑问:既然消费端是拉取消息,为什么是长轮询呢 ?,虽然拉模式的主动权在消费者这一侧,但是缺点很明显。,因为消费者并不知晓 Broker 端什么时候有新的消息 ,所以会不停地去 Broker 端拉取消息,但拉取频率过高, Broker 端压力就会很大,频率过低则会导致消息延迟。,所以要想消费消息的延迟低,服务端的推送必不可少。,下图展示了 RocketMQ 如何通过长轮询减小拉取消息的延迟。,核心流程如下:,1、Broker 端接收到消费者的拉取消息请求后,拉取消息处理器开始处理请求,根据拉取请求查询消息存储 ;,2、从消息存储中获取消息数据 ,若存在新消息 ,则将消息数据通过网络返回给消费者。若无新消息,则将拉取请求放入到拉取请求表 pullRequestTable 。,3、长轮询请求管理服务 pullRequestHoldService 每隔 5 秒从拉取请求表中判断拉取消息请求的队列是否有新的消息。,判定标准是:拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了。,若存在新的消息 ,  长轮询请求管理服务会触发拉取消息处理器重新处理该拉取消息请求。,4、当 commitlog 中新增了新的消息,消息分发服务会构建消费文件和索引文件,并且会通知长轮询请求管理服务,触发拉取消息处理器重新处理该拉取消息请求。,在拉取消息的流程里, Broker 端返回消息数据,消费者的通讯框架层会执行回调函数。,回调线程会将数据存储在队列消费快照 processQueue(内部使用红黑树 msgTreeMap)里,然后将消息提交到消费消息服务,消费消息服务会异步消费这些消息。,消息消费服务有两种类型:并发消费服务和顺序消费服务 。,并发消费是指消费者将并发消费消息,消费的时候可能是无序的。,消费消息并发服务启动后,会初始化三个组件:消费线程池、清理过期消息定时任务、处理失败消息定时任务。,核心流程如下:,0、通讯框架回调线程会将数据存储在消费快照里,然后将消息列表 msgList 提交到消费消息服务,1、 消息列表 msgList 组装成消费对象,2、将消费对象提交到消费线程池,我们看到10 条消息被组装成三个消费请求对象,不同的消费线程会执行不同的消费请求对象。,3、消费线程执行消息监听器,执行完消费监听器,会返回消费结果。,4、处理异常消息,当消费异常时,异常消息将重新发回 Broker 端的重试队列( RocketMQ 会为每个 topic 创建一个重试队列,以 %RETRY% 开头),达到重试时间后将消息投递到重试队列中进行消费重试。,假如异常的消息发送到 Broker 端失败,则重新将这些失败消息通过处理失败消息定时任务重新提交到消息消费服务。,5、更新本地消费进度,消费者消费一批消息完成之后,需要保存消费进度到进度管理器的本地内存。,首先我们会从队列消费快照 processQueue 中移除消息,返回消费快照 msgTreeMap 第一个偏移量 ,然后调用消费消息进度管理器 offsetStore 更新消费进度。,待更新的偏移量是如何计算的呢?,在场景3,RocketMQ 为了保证消息肯定被消费成功,消费进度只能维持在1001(消息1),直到1001也被消费完,本地的消费进度才会一下子更新到1011。,假设1001(消息1)还没有消费完成,消费者实例突然退出(机器断电,或者被 kill ),就存在重复消费的风险。,因为队列的消费进度还是维持在1001,当队列重新被分配给新的消费者实例的时候,新的实例从 Broker 上拿到的消费进度还是维持在1001,这时候就会又从1001开始消费,1001-1010这批消息实际上已经被消费过还是会投递一次。,所以业务必须要保证消息消费的幂等性。,写到这里,我们会有一个疑问:假设1001(消息1)因为加锁或者消费监听器逻辑非常耗时,导致极长时间没有消费完成,那么消费进度就会一直卡住 ,怎么解决呢 ?,RocketMQ 提供两种方式一起配合解决:,顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。,顺序消息分为分区顺序消息和全局顺序消息。,对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。,对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。,消息的顺序需要由两个阶段保证:,顺序消费服务的类是 ConsumeMessageOrderlyService ,在负载均衡阶段,并发消费和顺序消费并没有什么大的差别。,最大的差别在于:顺序消费会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。,顺序消费核心流程如下:,1、 组装成消费对象,2、 将请求对象提交到消费线程池,和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。,3、 消费线程内,对消费队列加锁,顺序消费也是通过线程池消费的,synchronized 锁用来保证同一时刻对于同一个队列只有一个线程去消费它,4、 从消费快照中取得待消费的消息列表,消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。,5、 执行消息监听器,消费快照的消费锁 consumeLock 的作用是:防止负载均衡线程把当前消费的 MessageQueue 对象移除掉。,6、 处理消费结果,消费成功时,首先计算需要提交的偏移量,然后更新本地消费进度。,消费失败时,分两种场景:,我们做一个关于顺序消费的总结 :,RocketMQ 消费者消费完一批数据后, 会将队列的进度保存在本地内存,但还需要将队列的消费进度持久化。,图片,集群模式下,分两种场景:,Broker 的这两个处理器都调用消费者进度管理器 consumerOffsetManager 的 commitOffset 方法,定时任务异步将消费进度持久化到消费进度文件 consumerOffset.json 中。,图片,广播模式消费进度存储在消费者本地,定时任务每隔 5 秒通过 LocalFileOffsetStore 持久化到本地文件offsets.json ,数据格式为 MessageQueue:Offset 。,图片,广播模式下,消费进度和消费组没有关系,本地文件 offsets.json 存储在配置的目录,文件中包含订阅主题中所有的队列以及队列的消费进度。,集群消费下,重试机制的本质是 RocketMQ 的延迟消息功能。,消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。,Broker 端会为每个 topic 创建一个重试队列 ,队列名称是:%RETRY% + 消费者组名 ,达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试 Topic)。最多重试消费 16 次,重试的时间间隔逐渐变长,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。,图片,开源 RocketMQ 4.X 支持延迟消息,默认支持18 个 level 的延迟消息,这是通过 broker 端的 messageDelayLevel 配置项确定的,如下:,图片,Broker 在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟 level 的个数,创建对应数量的队列,也就是说18个 level 对应了18个队列。,我们先梳理下延迟消息的实现机制。,1、生产者发送延迟消息,2、Broker端存储延迟消息,延迟消息在 RocketMQ Broker 端的流转如下图所示:,图片,第一步:修改消息 Topic 名称和队列信息,Broker 端接收到生产者的写入消息请求后,首先都会将消息写到 commitlog 中。假如是正常非延迟消息,MessageStore 会根据消息中的 Topic 信息和队列信息,将其转发到目标 Topic 的指定队列 consumequeue 中。,但由于消息一旦存储到 consumequeue 中,消费者就能消费到,而延迟消息不能被立即消费,所以 RocketMQ 将 Topic 的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。,同时,还会将消息原来要发送到的目标 Topic 和队列信息存储到消息的属性中。,图片,第二步:构建 consumequeue 文件时,计算并存储投递时间,图片,图片,上图是 consumequeue 文件一条消息的格式,最后 8 个字节存储 Tag 的哈希值,此时存储消息的投递时间。,第三步:定时调度服务启动,ScheduleMessageService 类是一个定时调度服务,读取 SCHEDULE_TOPIC_XXXX 队列的消息,并将消息投递到目标 Topic 中。,定时调度服务启动时,创建一个定时调度线程池 ,并根据延迟级别的个数,启动对应数量的 HandlePutResultTask ,每个 HandlePutResultTask 负责一个延迟级别的消费与投递。,图片,第四步:投递时间到了,将消息数据重新写入到 commitlog,消息到期后,需要投递到目标 Topic 。第一步已经记录了原来的 Topic 和队列信息,这里需要重新设置,再存储到 commitlog 中。,第五步:将消息投递到目标 Topic 中,Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)。因此消息会直接投递到目标 Topic 的 consumequeue 中,之后消费者就可以消费到这条消息。,回顾了延迟消息的机制,消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。,Broker 端 SendMessageProcessor 处理器会调用 asyncConsumerSendMsgBack 方法。,图片,首先判断消息的当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则重新创建 Topic ,规则是 %DLQ% + consumerGroup,后续处理消息发送到死信队列。,正常的消息会进入 else 分支,对于首次重试的消息,默认的 delayLevel 是 0 ,RocketMQ 会将 delayLevel + 3,也就是加到 3 ,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,也就是距离首次发送 10s 后重试,其主题的默认规则是 %RETRY% + consumerGroup。,当延时级别设置完成,刷新消息的重试次数为当前次数加 1 ,Broker 端将该消息刷盘,逻辑如下:,图片,延迟消息写入到 commitlog 里 ,这里其实和延迟消息机制的第一步类似,后面按照延迟消息机制的流程执行即可(第二步到第六步)。,下图展示了集群模式下消费者并发消费流程 :,图片,核心流程如下:,RocketMQ 4.X 的消费逻辑有两个非常明显的特点:,1、RocketMQ 4.9.4 Github 文档,2、RocketMQ 技术内幕,3、消息队列核心知识点,4、消息ACK机制及消费进度管理

文章版权声明

 1 原创文章作者:cmcc,如若转载,请注明出处: https://www.52hwl.com/26433.html

 2 温馨提示:软件侵权请联系469472785#qq.com(三天内删除相关链接)资源失效请留言反馈

 3 下载提示:如遇蓝奏云无法访问,请修改lanzous(把s修改成x)

 免责声明:本站为个人博客,所有软件信息均来自网络 修改版软件,加群广告提示为修改者自留,非本站信息,注意鉴别

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023年6月23日
下一篇 2023年7月15日