1. 首页
  2. rocketmq源码分析

006-六、RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布

作者:唯有坚持不懈 | 出处:https://blog.csdn.net/prestigeding/article/details/78888290

1、消息消费需要解决的问题

首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系

主题 —》 消息队列(MessageQueue) 1 对多。

主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。

主题 —》 消息消费者,一般一个主题也会被多个消费者消费。

那消息消费至少需要解决如下问题:

1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进行负载消费的。

2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的消息队列中的消息呢?

3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。

4、RocketMQ 推拉模式实现机制。

再提一个业界关于消费者与消息队列的消费规则。

1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?

本文紧接着上文:消息消费概述

继续探讨消息分发与消费端负载均衡。

我们从上文知道,PullMessageService 线程主要是负责 pullRequestQueue 中的 PullResult,那问题来了,pullRequestQueue 中的数据从哪来,在什么时候由谁来填充呢。

img_0914_01_1.png

那我们就先沿着这条线索分析下去,看一下 PullMessageServicepullReqestQueue 添加元素的方法的调用链条如下:

img_0914_01_2.png

也就是调用链:


RebalanceService. run() MQClientInstance.doRebalance() DefaultMQPulConsumerImpl.doRebalance() RebalanceImpl.doRebalance() RebalanceImpl.rebalanceByTopic RebalanceImpl.updateProcessQueueTableInRebalance RebalanceImpl.dispatchPullRequest DefaultMQPushConsumerImpl.executePullRequestImmediately

从上面可以直观的看出,向 PullMesssageServiceLinkedBlockingQueue<PullRequest> pullRequestQueue 添加 PullRequest 的是 RebalanceService.run 方法,就是向 PullMessageService 中放入 PullRequest,才会驱动 PullMessageSerivce run 方法的运行,如果 pullRequestQueue 中没有元素,PullMessageService 线程将被阻塞。

那么 RebalanceService 是何许人也,让我们一起来揭开其神秘面纱。

2、消息消费负载机制分析

2.1 RebalanceService 线程

img_0914_01_3.png

img_0914_01_4.png

从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService 线程的 run 方法比较简单,就是直接调用 mqClientFactory.doRebalance

下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。

2.1.1 MQClientInstance.doRebalance

循环遍历每个消费组获取 MQConsumeInner 对象(其实就是 DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 对象),并执行其 doRebalance 方法。

img_0914_01_5.png

2.1.2 DefaultMQPushConsumerImpl.doRebalance

img_0914_01_6.png

RebalanceImpl doRebalance

img_0914_01_7.png

到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl 类,我们应该停下脚步,先重点认识一下RebalanceImpl类。

3、RebalanceImpl 类初探

我们先来看看其核心属性:

img_0914_01_8.png


* ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 消息处理队列。 * ConcurrentMap<String, Set<MessageQueue> topicSubscribeInfoTable topic 的队列信息。 * ConcurrentMap<String, SubscriptionData> subscriptionInner 订阅信息。 * String consumerGroup 消费组名称。 * MessageModel messageModel 消费模式。 * AllocateMessageQueueStrategy allocateMessageQueueStrategy 队列分配算法。 * MQClientInstance mqClientFactory MQ 客户端实例。

下面还是从 doRebalance 方法入手:

img_0914_01_9.png

1、根据 topic 来进行负载。

2、移除 MessageQueue,如果 MesageQueuetopic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。

RebalanceImpl rebalanceByTopic 详解:

part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。

part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。

img_0914_01_10.png

part3: 主要是对主题的消息队列排序、消费者 ID 进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。

具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy 的实现类,具体算法实现就不细化研究了。

在这里举一个最简单的队列分配机制,,比如一个 topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者 c1,c2,c3

一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5c3。下文会专题研究一下负载算法。

img_0914_01_11.png

part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。

img_0914_01_12.png

img_0914_01_13.png

img_0914_01_14.png

遍历消息队列-处理队列缓存,只处理 mq 的主题与该主题相关的 ProcessQueue, 如果 mq 不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃 (droppedvoliate 修饰),可以及时的阻止继续向 ProceeQueue 中拉取数据,然后执行removeUnecessaryMessageQueue(mq,pq)来判断是否需要移除。

img_0914_01_15.png

既然我们都是从 Push 进入的,本文以 Push 模式展开(同时我们也可以先思考思考 push,pull 差别),移步到 RebalancePushImpl

img_0914_01_16.png

目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。

接下来处理 MessageQueueProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。

img_0914_01_17.png

主要就是在内存中移除 MessageQueueofferset, 然后计算下一个拉取偏移量,然后每一个 MessageQueue 创建一个拉取任务(PullRequest)。

img_0914_01_18.png

RebalancePushImpl

img_0914_01_19.png

PullMessageService

img_0914_01_20.png

PullServiceMessage 中的 pullRequestQueue 中放入 PullRequest,则 PullMessageService 线程 的 run 方法就不会阻塞

img_0914_01_21.png

part5:如果消息负载发生变化,需处理

img_0914_01_22.png

主要是调整主题小各个队列的拉取阔值。

img_0914_01_23.png

这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。

总结:


本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:

1、首先 RebalanceService 线程启动,为消费者分配消息队列,其实每一个 MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImplPullRequest 放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImplpullMessage,通过网络从 broker 端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService 中(DefaultMQPushConsumerImpl 的机制就是 pullInterval0

下文预告:

CommitLog 写入与 ConsumeQueue 队列的持久化机制

消息消费进度存储机制,再谈 RocketMQ 消息存储

RocketMQ 顺序消息

RocketMQ 主从机制

备注:本文是《 RocketMQ 技术内幕》的原始素材,建议关注笔者的书籍:《 RocketMQ 技术内幕》。

写完了如果写得有什么问题,希望读者能够给小编留言,也可以点击[此处扫下面二维码关注微信公众号](https://www.ycbbs.vip/?p=28 "此处扫下面二维码关注微信公众号")

看完两件小事

如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:

  1. 关注我们的 GitHub 博客,让我们成为长期关系
  2. 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
  3. 关注公众号 「方志朋」,公众号后台回复「666」 免费领取我精心整理的进阶资源教程
  4. JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

    本文著作权归作者所有,如若转载,请注明出处

    转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

    标题:006-六、RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布

    链接:https://www.javajike.com/article/1682.html

« 007-七、RocketMQ源码分析之消息消费重试机制
005-五、RocketMQ源码分析消息消费机制—-消费者拉取消息机制»

相关推荐

QR code