RocketMQ


RocketMQ 官方中文文档:https://rocketmq.apache.org/zh/docs/

https://gitcode.com/apache/rocketmq/blob/master/docs/cn/README.md

一、基本概念

1、主题(Topic)

(1)定义

Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。

  • 定义数据的分类隔离: 在 Apache RocketMQ 的方案设计中,建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。
  • 定义数据的身份和权限: Apache RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。

(2)模型关系

在整个 Apache RocketMQ 的领域模型中,主题所处的流程和位置如下:

image-20240514114715688

主题是 Apache RocketMQ 的顶层存储,所有消息资源的定义都在主题内部完成,但主题是一个逻辑概念,并不是实际的消息容器。

主题内部由多个队列组成,消息的存储和水平扩展能力最终是由队列实现的;并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列来实现。

(3)内部属性

主题名称

  • 定义:主题的名称,用于标识主题,主题名称集群内全局唯一。
  • 取值:由用户创建主题时定义。
  • 约束:请参见参数限制

队列列表

  • 定义:队列作为主题的组成单元,是消息存储的实际容器,一个主题内包含一个或多个队列,消息实际存储在主题的各队列内。更多信息,请参见队列(MessageQueue)
  • 取值:系统根据队列数量给主题分配队列,队列数量创建主题时定义。
  • 约束:一个主题内至少包含一个队列。

消息类型

  • 定义:主题所支持的消息类型。
  • 取值:创建主题时选择消息类型。Apache RocketMQ 支持的主题类型如下:
    • Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
    • FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
    • Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
    • Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
  • 约束:Apache RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。为保证向下兼容4.x版本行为,强制校验功能默认开启。

(4)行为约束

消息类型强制校验

Apache RocketMQ 5.x版本支持将消息类型拆分到主题中进行独立运维和处理,因此系统会对发送的消息类型和主题定的消息类型进行强制校验,若校验不通过,则消息发送请求会被拒绝,并返回类型不匹配异常。校验原则如下:

  • 消息类型必须一致:发送的消息的类型,必须和目标主题定义的消息类型一致。
  • 主题类型必须单一:每个主题只支持一种消息类型,不允许将多种类型的消息发送到同一个主题中。

常见错误使用场景

  • 发送的消息类型不匹配例如,创建主题时消息类型定义为顺序消息,发送消息时发送事务消息到该主题中,此时消息发送请求会被拒绝,并返回类型不匹配异常。
  • 单一消息主题混用例如,创建主题时消息类型定义为普通消息,发送消息时同时发送普通消息和顺序消息到该主题中,则顺序消息的发送请求会被拒绝,并返回类型不匹配异常。

注意:

为保证向下兼容4.x版本行为,上述强制校验功能默认开启。

消息类型的强制校验,仅 Apache RocketMQ 服务端5.x版本支持,且默认开启,推荐部署时打开配置。 Apache RocketMQ 服务端4.x和3.x历史版本的SDK不支持强制校验,您需要自己保证消息类型一致。 如果您使用的服务端版本为历史版本,建议您升级到 Apache RocketMQ 服务端5.x版本。

(5)使用建议

按照业务分类合理拆分主题

Apache RocketMQ 的主题拆分设计应遵循大类统一原则,即将相同业务域内同一功能属性的消息划分为同一主题。拆分主题时,您可以从以下角度考虑拆分粒度:

  • 消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题。
  • 消息业务是否关联:如果业务没有直接关联,比如,淘宝交易消息和盒马物流消息没有业务交集,需要使用不同的消息主题;同样是淘宝交易消息,女装类订单和男装类订单可以使用同一个订单。当然,如果业务量较大或其他子模块应用处理业务时需要进一步拆分订单类型,您也可以将男装订单和女装订单的消息拆分到两个主题中。
  • 消息量级是否一样:数量级不同或时效性不同的业务消息建议使用不同的主题,例如某些业务消息量很小但是时效性要求很强,如果跟某些万亿级消息量的业务使用同一个主题,会增加消息的等待时长。

正确拆分示例: 线上商品购买场景下,订单交易如订单创建、支付、取消等流程消息使用一个主题,物流相关消息使用一个主题,积分管理相关消息使用一个主题。

错误拆分示例:

  • 拆分粒度过粗:会导致业务隔离性差,不利于独立运维和故障处理。例如,所有交易消息和物流消息都共用一个主题。
  • 拆分粒度过细:会消耗大量主题资源,造成系统负载过重。例如,按照用户ID区分,每个用户ID使用一个主题。

单一主题只收发一种类型消息,避免混用

Apache RocketMQ 主题的设计原则为通过主题隔离业务,不同业务逻辑的消息建议使用不同的主题。同一业务逻辑消息的类型都相同,因此,对于指定主题,应该只收发同一种类型的消息。

主题管理尽量避免自动化机制

在 Apache RocketMQ 架构中,主题属于顶层资源和容器,拥有独立的权限管理、可观测性指标采集和监控等能力,创建和管理主题会占用一定的系统资源。因此,生产环境需要严格管理主题资源,请勿随意进行增、删、改、查操作。

Apache RocketMQ 虽然提供了自动创建主题的功能,但是建议仅在测试环境使用,生产环境请勿打开,避免产生大量垃圾主题,无法管理和回收并浪费系统资源。

(6)创建主题命令

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n  -t  -c  -a +message.type=

其中message_type根据消息类型设置成Normal/FIFO/Delay/Transaction。如果不设置,默认为Normal类型。

2、消息相关

(1)消息类型(MessageType)

Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

(2)消息队列(MessageQueue)

队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。更多信息,请参见队列(MessageQueue)

(3)消息(Message)

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。更多信息,请参见消息(Message)

(4)消息视图(MessageView)

消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。

(5)消息标签(MessageTag)

消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。更多信息,请参见消息过滤

(6)消息位点(MessageQueueOffset)

消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。更多信息,请参见消费进度管理

(7)消费位点(ConsumerOffset)

一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。更多信息,请参见消费进度管理

(8)消息索引(MessageKey)

消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

3、生产消费相关

(1)事务检查器(TransactionChecker)

Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。更多信息,请参见事务消息

(2)事务状态(TransactionResolution)

Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。更多信息,请参见事务消息

(3)消费者分组(ConsumerGroup)

消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。更多信息,请参见消费者分组(ConsumerGroup)

(4)订阅关系(Subscription)

订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。更多信息,请参见订阅关系(Subscription)

Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

(5)消息过滤

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。更多信息,请参见消息过滤

(6)重置消费位点

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。更多信息,请参见重置消费位点

(7)消息轨迹

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

(8)消息堆积

生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

(9)事务消息

事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

(10)定时/延时消息

定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

(11)顺序消息

顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

二、参数约束和建议

Apache RocketMQ 系统中存在很多自定义参数和资源命名,您在使用 Apache RocketMQ 时建议参考如下说明规范系统设置,避对某些具体参数设置不合理导致应用出现异常。

参数 建议范围 说明
Topic名称 字符建议:字母az或AZ、数字09以及下划线(*)、短划线(-)和百分号(%)。 长度建议:164个字符。 系统保留字符:Topic名称不允许使用以下保留字符或含有特殊前缀的字符命名。 保留字符: TBW102 *BenchmarkTest* SELF_TEST_TOPIC *OFFSET_MOVED_EVENT* SCHEDULE_TOPIC_XXXX *RMQ_SYS_TRANS_HALF_TOPIC* RMQ_SYS_TRACE_TOPIC *RMQ_SYS_TRANS_OP_HALF_TOPIC 特殊前缀:* rmq_sys* %RETRY% %DLQ% rocketmq-broker- Topic命名应该尽量使用简短、常用的字符,避免使用特殊字符。特殊字符会导致系统解析出现异常,字符过长可能会导致消息收发被拒绝。
ConsumerGroup名称 字符建议:支持字母az或AZ、数字09以及下划线(*)、短划线(-)和百分号(%)。 长度建议:164个字符。 系统保留字符:ConsumerGroup不允许使用以下保留字符或含有特殊前缀的字符命名。 保留字符: *DEFAULT_CONSUMER* DEFAULT_PRODUCER *TOOLS_CONSUMER* FILTERSRV_CONSUMER *__MONITOR_CONSUMER* CLIENT_INNER_PRODUCER *SELF_TEST_P_GROUP* SELF_TEST_C_GROUP *CID_ONS-HTTP-PROXY* CID_ONSAPI_PERMISSION *CID_ONSAPI_OWNER* CID_ONSAPI_PULL *CID_RMQ_SYS_TRANS* 特殊字符 * CID_RMQ_SYS* * CID_HOUSEKEEPING 无。
ACL Credentials 字符建议:AK(AccessKey ID)、SK(AccessKey Secret)和Token仅支持字母az或AZ、数字0~9。 长度建议:不超过1024个字符。 无。
请求超时时间 默认值:3000毫秒。 取值范围:该参数为客户端本地行为,取值范围建议不要超过30000毫秒。 请求超时时间是客户端本地同步调用的等待时间,请根据实际应用设置合理的取值,避免线程阻塞时间过长。
消息大小 默认值:不超过4 MB。不涉及消息压缩,仅计算消息体body的大小。 取值范围:建议不超过4 MB。 消息传输应尽量压缩和控制负载大小,避免超大文件传输。若消息大小不满足限制要求,可以尝试分割消息或使用OSS存储,用消息传输URL。
消息自定义属性 字符限制:所有可见字符。 长度建议:属性的Key和Value总长度不超过16 KB。 系统保留属性:不允许使用以下保留属性作为自定义属性的Key。 保留属性Key 无。
MessageGroup 字符限制:所有可见字符。 长度建议:1~64字节。 MessageGroup是顺序消息的分组标识。一般设置为需要保证顺序的一组消息标识,例如订单ID、用户ID等。
消息发送重试次数 默认值:3次。 取值范围:无限制。 消息发送重试是客户端SDK内置的重试策略,对应用不可见,建议取值不要过大,避免阻塞业务线程。 如果消息达到最大重试次数后还未发送成功,建议业务侧做好兜底处理,保证消息可靠性。
消息消费重试次数 默认值:16次。 消费重试次数应根据实际业务需求设置合理的参数值,避免使用重试进行无限触发。重试次数过大容易造成系统压力过量增加。
事务异常检查间隔 默认值:60秒。 事务异常检查间隔指的是,半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照该间隔时间进行事务状态回查。 间隔时长不建议设置过短,否则频繁的回查调用会影响系统性能。
半事务消息第一次回查时间 默认值:取值等于[事务异常检查间隔] * 最大限制:不超过1小时。 无。
半事务消息最大超时时长 默认值:4小时。 * 取值范围:不支持自定义修改。 半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照事务异常检查间隔时间进行回查,若超过半事务消息超时时长后没有返回结果,半事务消息将会被强制回滚。 您可以通过监控该指标避免异常事务。
PushConsumer本地缓存 默认值: *最大缓存数量:1024条。 *最大缓存大小:64 M。 取值范围:支持用户自定义设置,无限制。 消费者类型为PushConsumer时,为提高消费者吞吐量和性能,客户端会在SDK本地缓存部分消息。缓存的消息的数量和大小应设置在系统内存允许的范围内。
PushConsumer重试间隔时长 默认值: *非顺序性投递:间隔时间阶梯变化,具体取值,请参见PushConsumer消费重试策略。 *顺序性投递:3000毫秒。 无。
PushConsumer消费并发度 默认值:20个线程。 无。
获取消息最大批次 默认值:32条。 消费者从服务端获取消息时,一次获取到最大消息条数。建议按照实际业务设置合理的参数值,一次获取消息数量过大容易在消费失败时造成大批量消息重复。
SimpleConsumer最大不可见时间 默认值:用户必填参数,无默认值。 取值范围建议:最小10秒;最大12小时。 消费不可见时间指的是消息处理+失败后重试间隔的总时长,建议设置时取值比实际需要耗费的时间稍微长一些。

三、特性

1 订阅与发布

消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。

2 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

3 消息过滤

RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。

4 消息可靠性

RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker非正常关闭
  2. Broker异常Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。

5 至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

6 回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。

7 事务消息

RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

8 定时消息

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

9 消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  • 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

10 消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

11 流量控制

生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
  • 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
  • broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
  • broker通过拒绝send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。

消费者流控的结果是降低拉取频率。

12 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

四、架构

1、技术架构

image-20240516171116981

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

image-20240516171320128

综上可知,部署流程为:

  • 1、启动 NameServer,即注册中心
  • 2、启动 Broker,消息代理
  • 3、创建 Topic
  • 4、启动生产者
  • 5、启动消费者

2、网络架构

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

3、集群工作流程

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

五、部署

Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。

  • 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
  • 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。

系统要求

  1. 64位操作系统,推荐 Linux/Unix/macOS
  2. 64位 JDK 1.8+

1、下载安装RocketMQ

ROCKETMQ下载

RocketMQ 的安装包分为两种,二进制包和源码包。 点击这里 下载 Apache RocketMQ 5.2.0的源码包。你也可以从这里 下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

各版本下载地址:https://rocketmq.apache.org/zh/download/

(1)二进制包安装

下载 zip 包,使用 unzip 命令解压,得到如下文件

image-20240516164302349

(2)源码包安装

这里以在Linux环境下利用社区5.2.0的源码包为例,介绍RocketMQ安装过程。

解压5.2.0的源码包并编译构建二进制可执行文件

$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0

2、Local模式集群部署

由于 Local 模式下 Proxy 和 Broker 是同进程部署,Proxy本身无状态,因此主要的集群配置仍然以 Broker 为基础进行即可。

注意:启动Broker时,确保内存超过8G,否则需要修改 runbroker.sh 中的 Xmx 和 Xms 配置

部署步骤:

  • 先启动 NameServer

  • 再启动 Broker+Proxy,有多种部署模式,如下

    • 单组节点单副本模式
  • 多组节点(集群)单副本模式

    • 多节点(集群)多副本模式-异步复制
    • 多节点(集群)多副本模式-同步双写

(1)启动 NameServer

NameServer 类似于注册中心,就是 Nacos 的 NameServer。

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

nohup sh mqnamesrv &

验证Name Server 是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

成功输出结果

The Name Server boot success...

默认启动端口 9876

(2)单组节点单副本模式

警告:这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。

  • 启动 Broker+Proxy,localhost:9876 为 NameServer 地址
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy >/dev/null 2>&1 &
  • 验证Broker 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
tail -f ~/logs/rocketmqlogs/broker_default.log 
  • 输出下面结果表明启动成功
The broker[xxx, 192.169.1.2:10911] boot success...

(3)多组节点(集群)单副本模式

一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

启动Broker+Proxy集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &

...

如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876

(4)多节点(集群)多副本模式-异步复制

每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

启动Broker+Proxy集群(注意配置文件不同)

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties --enable-proxy &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties --enable-proxy &

(5)多节点(集群)多副本模式-同步双写

每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

启动 Broker+Proxy 集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties --enable-proxy &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties --enable-proxy &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties --enable-proxy &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties --enable-proxy &

以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。

(6)5.0 HA新模式

提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见

3、Cluster模式集群部署

在 Cluster 模式下,Broker 与 Proxy分别部署,我可以在 NameServer和 Broker都启动完成之后再部署 Proxy。

在 Cluster模式下,一个 Proxy集群和 Broker集群为一一对应的关系,可以在 Proxy的配置文件 rmq-proxy.json 中使用 rocketMQClusterName 进行配置

(1)启动 NameServer

首先启动Name Server

nohup sh mqnamesrv &

验证Name Server 是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功

The Name Server boot success...

(2)启动 Broker - 单组节点单副本模式

这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。

在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1

nohup sh bin/mqbroker -n 192.168.1.1:9876 &

(3)启动 Broker - 多组节点(集群)单副本模式

一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

...

备注:如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876

(4)启动 Broker - 多节点(集群)多副本模式-异步复制

每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

(5)启动 Broker - 多节点(集群)多副本模式-同步双写

每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。

(6)启动 Broker - 5.0 HA新模式

提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见

(7)启动 Proxy

可以在多台机器启动多个Proxy

### 在机器A,启动第一个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n 192.168.1.1:9876 &

### 在机器B,启动第二个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n 192.168.1.1:9876 &

### 在机器C,启动第三个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n 192.168.1.1:9876 &

需要指定配置文件,可以使用 -pc或者 --proxyConfigPath 进行指定

### 自定义配置文件
nohup sh bin/mqproxy -n 192.168.1.1:9876 -pc /path/to/proxyConfig.json &

4、关闭服务器

  • 关闭 broker
sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker with proxy enable OK(36695)
  • 关闭server
sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

5、配置文件

(1)端口

安装rocketmq后,默认开放的端口一般有4个:9876,10911,10912,10909
  • 9876:NameServer的端口
  • 10911:配置文件字段:listenPort,broker的监听端口号,是remotingServer服务组件使用,作为对Producer和Consumer提供服务的端口号
  • 10909:配置文件字段:fastListenPort,fastRemotingServer服务组件使用,默认为listenPort - 2
  • 10912:配置文件字段:haListenPort,HAService服务组件使用,用于Broker的主从同步,默认为listenPort - 1,可以通过配置文件修改。

remotingServer和fastRemotingServer的区别:

  • Broker端:

remotingServer可以处理客户端所有请求,如:生产者发送消息的请求,消费者拉取消息的请求。
fastRemotingServer功能基本与remotingServer相同,唯一不同的是不可以处理消费者拉取消息的请求。
Broker在向NameServer注册时,只会上报remotingServer监听的listenPort端口。

  • 客户端:

默认情况下,生产者发送消息是请求fastRemotingServer,我们也可以通过配置让其请求remotingServer;消费者拉取消息只能请求remotingServer。

端口可通过修改配置文件自定义:/conf/broker.conf

listenPort=10911
fastListenPort=10909
haListenPort=10912

默认:grpc的端口是8081,remoting监听端口是8080

在 proxyConfig.json 文件中输入如下内容,端口按实际需要修改。

{
  "rocketMQClusterName": "DefaultCluster",
  "grpcServerPort": 8081,
  "remotingListenPort": 8080
}

(2)brokerName、brokerId

一组 broker 的 Master 和 Slave 是通过 brokerName 来配对的,即通一组 broker 中的 Master 和 Slave 的 brokerName 是一样的

  • 一组 broker 中有且仅有一个 Master 节点,并且该节点固定 brokerId = 0
  • 一组 broker 可以有多个 Slave 节点
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

6、启动失败原因

(1)内存不足导致无法启动

启动失败后查看,当前目录下的失败日志,如下说明内存不足

There is insufficient memory for the Java Runtime Environment to continue

可以修改 MQ 启动的 JVM 内存

先找启动脚本,看看脚本里面具体执行了哪个 sh 文件,再到对应的 sh 文件中找到 JAVA_OPT 中的参数 XmxXms,修改这两个参数

六、权限控制

权限控制(ACL)主要为 RocketMQ 提供 Topic 资源级别的高级访问控制功能。用户在使用RocketMQ权限控制时,可以在Client客户端注入用户名和密码参数实现签名,服务端通过权限控制参数实现各个资源的权限管理和校验。

ACL控制在增强集群访问控制安全性的同时也会带来部署流程和运维管理的复杂度。

一般仅建议在网络环境不安全、业务数据敏感、多部门租户混用的场景下使用。如果生产集群本身是私有集群不会被外部部门租户访问,可以不开启。

1、权限控制的定义与属性值

(1)权限定义

对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种

权限 含义
DENY 拒绝
ANY PUB 或者 SUB 权限
PUB 发送权限
SUB 订阅权限

(2)权限定义的关键属性

字段 取值 含义
globalWhiteRemoteAddresses ;192.168..*;192.168.0.1 全局IP白名单
accessKey 字符串 Access Key
secretKey 字符串 Secret Key
whiteRemoteAddress ;192.168..*;192.168.0.1 用户IP白名单
admin true;false 是否管理员账户
defaultTopicPerm DENY;PUB;SUB;PUB|SUB 默认的Topic权限
defaultGroupPerm DENY;PUB;SUB;PUB|SUB 默认的ConsumerGroup权限
topicPerms topic=权限 各个Topic的权限
groupPerms group=权限 各个ConsumerGroup的权限

具体可以参考distribution/conf/plain_acl.yml配置文件

globalWhiteRemoteAddresses:
  - 10.10.103.*
  - 192.168.0.*

accounts:
  - accessKey: RocketMQ
    secretKey: 12345678
    whiteRemoteAddress:
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: SUB
    topicPerms:
      - topicA=DENY
      - topicB=PUB|SUB
      - topicC=SUB
    groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=PUB|SUB
      - groupC=SUB

  - accessKey: rocketmq2
    secretKey: 12345678
    whiteRemoteAddress: 192.168.1.*
    # if it is admin, it could access all resources
    admin: true

2、支持权限控制的集群部署

distribution/conf/plain_acl.yml配置文件中按照上述说明定义好权限属性后,打开aclEnable开关变量即可开启RocketMQ集群的ACL特性。这里贴出Broker端开启ACL特性的properties配置文件内容:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

3、权限控制主要流程

ACL主要流程分为两部分,主要包括权限解析和权限校验。

(1)权限解析

Broker端对客户端的RequestCommand请求进行解析,拿到需要鉴权的属性字段。 主要包括:

(1)AccessKey:类似于用户名,代指用户主体,权限数据与之对应;

(2)Signature:客户根据 SecretKey 签名得到的串,服务端再用SecretKey进行签名验证;

(2)权限校验

Broker端对权限的校验逻辑主要分为以下几步:

(1)检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;

(2)检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;

(3)校验签名,校验不通过,抛出异常;校验通过,则走 4;

(4)对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;

用户所需权限的校验需要注意已下内容:

(1)特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;

(2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;

5、热加载修改后权限控制定义

RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点。

6、权限控制的使用限制

(1)如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的distribution/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中。

(2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。

七、mqadmin管理工具

注意:

  1. 执行命令方法:./mqadmin {command} {args}
  2. 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
  3. 几乎所有命令都可以通过-h获取帮助
  4. 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令,如果不配置Broker地址,则对集群中所有主机执行命令,只支持一个Broker地址。-b格式为ip:port,port默认是10911
  5. 在tools下可以看到很多命令,但并不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令
  6. 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

具体更多命令查看

https://rocketmq.apache.org/zh/docs/deploymentOperations/02admintool

https://gitcode.com/apache/rocketmq/blob/master/docs/cn/operation.md

1、Topic相关

(1)创建更新Topic配置

./mqadmin updateTopic {args}
[root@VM-20-13-centos bin]# ./mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t testTopic -p 6
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=testTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
选项 说明
-b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询)
-h- 打印帮助
-n NameServer服务地址,格式 ip:port
-p 指定新topic的读写权限( W=2|R=4|WR=6 )
-r 可读队列数(默认为 8)
-w 可写队列数(默认为 8)
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )

(2)删除Topic

./mqadmin deleteTopic -n 127.0.0.1:9876 -t testTopic
选项 说明
-c cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询)
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )

八、RocketMQ Dashboard

RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

文档:https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard

面板 功能
运维 修改nameserver 地址; 选用 VIPChannel
驾驶舱 查看 broker, topic 消息量
集群 集群分布,broker 配置、运行信息
主题 搜索、筛选、删除、更新/新增主题,消息路由,发送消息,重置消费位点
消费者 搜索、删除、新增/更新消费者组,终端,消费详情,配置
消息 消息记录,私信消息,消息轨迹等消息详情

1. docker 镜像安装

① 安装docker,拉取 rocketmq-dashboard 镜像

docker pull apacherocketmq/rocketmq-dashboard:latest

② docker 容器中运行 rocketmq-dashboard

docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard:latest

提示:namesrv.addr:port 替换为 rocketmq 中配置的 nameserver 地址:端口号

开放端口号:8080,9876,10911,11011 端口

  • 云服务器:设置安全组访问规则
  • 本地虚拟机:关闭防火墙,或 -add-port

2. 源码安装

源码地址:apache/rocketmq-dashboard

下载并解压,切换至源码目录 rocketmq-dashboard-master/

① 编译 rocketmq-dashboard

mvn clean package -Dmaven.test.skip=true

② 运行 rocketmq-dashboard

java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

提示:Started App in x.xxx seconds (JVM running for x.xxx) 启动成功

浏览器页面访问:namesrv.addr:8080

关闭 rocketmq-dashboard : ctrl + c

再次启动:执行 ②

tips:下载后的源码需要上传到 Linux 系统上编译,本地编译可能会报错。

九、消息类型

1、普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

(1)应用场景

  • 微服务异步解耦
  • 数据集成传输:例如 ELK 日志异步收集

(2)普通消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制

设置全局唯一业务索引键,方便问题追踪

Apache RocketMQ支持自定义索引键(消息的Key),在消息查询和轨迹查询时,可以通过索引键高效精确地查询到消息。

因此,发送消息时,建议设置业务上唯一的信息作为索引,方便后续快速定位消息。例如,订单ID,用户ID等。

(3)创建普通消息主题

sh mqadmin updateTopic -n  -t  -c  -a +message.type=NORMAL

2、定时/延时消息

定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

(1)应用场景

  • 分布式定时调度:定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求
  • 任务超时处理:订单超时未支付关单

(2)定时时间设置原则

  • Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
  • 定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。具体方式,请参见Unix时间戳转换工具
  • 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
  • 定时时长最大值默认为24小时,不支持自定义修改,更多信息,请参见参数限制
  • 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

示例如下:

  • 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000。
  • 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000。

(3)定时消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制

(4)使用限制

消息类型一致性

定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

定时精度约束

Apache RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。

Apache RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟。

避免大量相同定时时刻的消息

定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

(5)创建延时主题

/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.typeDELAY的属性用来支持延迟消息

3、顺序消息

(1)应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。

  • 撮合交易:以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
  • 数据实时增量同步:以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

  目录