RocketMQ
架构
RocketMQ
技术架构中有四大角色 NameServer
、Broker
、Producer
、Consumer
。
Broker
:主要负责消息的存储、投递和查询以及服务高可用保证。一个Topic
分布在多个Broker
上,一个Broker
可以配置多个Topic
,它们是多对多的关系。NameServer
: 注册中心 ,主要提供两个功能:Broker 管理 和 路由信息管理 。Producer
:消息发布的角色Consumer
:消息消费的角色
两个基础模式
队列模式
消费者竞争队列上的消息,消息被消费了就从队列上移除,消息只能被一个消费者消费。
存在一个消息无法被多个消费者消费的问题。如何解决?可以将一份消息复制多份到多个队列,但是存在消息冗余。更好的解决方式是发布-订阅模式。
发布-订阅模式
生产者发布消息,消费者订阅消息。
满足多消费者的诉求。
如何实现?需要维护每个消费者已经消费到的消息位置,每个消费者消费一条消息,对应的消息位置就 +1 ,然后消费者根据记录的消费位置去消费对应的消息。
同一个消费者组中的消费者如何消费消息?发往一个 Topic 的消息,实际不是在一个队列里的,而是分布在多个队列(分区)中。这样一个消费者组中的消费者们可以专门负责主题里的一个队列(分区)。
为什么一个主题中需要维护多个队列 ?提高并发能力。
核心原理:生产者发布消息,Topic下分到队列,然后维护每个消费组在每个Topic下每个队列的消息位置,以消息位置(offset)来控制消息消费的进度。
消息位置的灵活性不仅能区分不同消费者组或者消费者们的消费进度,还能实现重复消费或者跳过部分消息不消费的功能。
存储
RocketMQ
消息存储架构中的三大角色——CommitLog
、ConsumeQueue
和 IndexFile
。
Broker会将不同Topic的消息都写入到同一个本地的磁盘文件commitlog上
- 每条消息的大小都是不固定的
- 不同 topic 的消息都存放在一个文件里
为什么不把不同topic的消息维护到不同文件里呢?
如果将不同 topic 存入不同的文件,无法保证这些文件在物理位置上是连续的,如果 topic 很多对应的文件就会很多,每次写入不同的文件可能都需要寻不同的道,写不同的扇区,这样速度就很慢。
将所有 topic 都写入一个文件里,并且都是顺序追加写入,那么对应到磁盘上就是顺序写,大大提高了消息队列写消息的性能。这就是RocketMQ为什么性能好的原因之一:顺序写。
消费者如何快速找到commitlog里的消息呢?
消息写入 commitlog 就代表生产者成功发布了这条消息,也就被持久化到硬盘了,这时我们可以启动一个定时任务将新写入commitlog的消息映射到 ConsumeQueue,然后消费者从中获取消息。
定时任务默认 1ms 触发一次。
全量消息存储在commitlog中,ConsumeQueue中保存 CommitLog
中的起始物理偏移量 offset
,消息大小 size
和消息 Tag
的 HashCode
值,从而节省了存储空间。
消费者消费一条消息的完整流程:通过ConsumeOffset找到ConsumeQueue里面的内容,这个内容存储了消息所在 commitlog 中的偏移量和size,然后再通过这两个数据去 commitlog 获取完整消息内容。
如何快速找到一条消息呢?消息索引 indexFile
该索引文件如何设计呢?最直观的想法是设计成一个HashMap,key为索引值,value为消息的offset。然而文件不提供HashMap功能。
indexFile 文件格式如下:
每个槽大小固定,4字节:计算key的哈希值对应到槽上,存储 index item 的下标
index item 也大小固定,20字节:存储消息在commitlog中的偏移量
通过索引查询步骤:
- 计算key的哈希得到槽位
- 从槽位得到 index item的下标
- 从 index item 对应位置得到消息的物理偏移量,获取消息
如何解决hash冲突?
- 更新槽的值为最新的 index item 的下标
- 在最新插入的 index item 里记录前一个 index item 的下标和 keyhash。
keyhash用来判断和当前计算的hash是否一致,一致说明找到了,不一致说明冲突,通过preIndex继续查找前一个。
命名服务 NameSrv
Broker启动之前,NameSrv需要先启动,待Broker启动后,Broker需要将自己的一些信息上报到 NameSrv上,并且每 30s 也会上报自身信息到 NameSrv
NameSrv 会每10s扫描它记录的 Broker 列表,看看这些 Broker 是否存活,具体的判断方式是看 120s 内该 Broker 是否上报自身消息到 NameSrv,如果超过120s没有上报就移除这个 Broker 相关信息,表示下线了。
具体的信息包含:
- topicQueueTable:主题和队列关系
- brokerAddrTable:broker名字和broker相关属性
- clusterName:集群和对应broker关联关系
- brokerLiveTable:Broker地址对应的broker存活的相关信息,每10s扫描的就是这个map
Producer和Consumer 从 NameSrv 获取路由信息,然后和broker建立长连接,后面直接与broker进行通信
集群内的 NameSrv 是相互独立的存在,他们之间不会进行任何的信息交互。
因此 Broker 需要给集群内的每一台 NameSrv 都上报路由信息,这样每台 NameSrv 存储的都是完整的路由信息
对于 Producer和 Consumer来说,只需要随机选择集群内的一台 NameSrv 进行长连接即可获取全量的路由信息,每30s拉取最新的Topic相关数据
NameSrv 就是动态路由中心,会维护存活的Broker信息,记录 Topic 的路由关系,是的 Producer 和 Consumer可以正确找到对应的 Broker。kafka中对应的NameSrv 角色就是Zookeeper
消息
普通消息
发送消息的3种方式:
1. 同步消息
生产者发送一条消息给broker,需要等待返回响应,然后才发送后续消息。没收到响应就会进行重试,默认重试三次。
重试机制有一个弊端:消息的重复发送。broker成功收到消息但是响应丢失,生产者重新发送后broker就会存储两条消息。所以需要做好防重或者幂等。
如何保证消息一定发送成功?broker接收消息后,返回ack响应给生产者,没收到ack则重试。
2. 异步消息
生产者不需要等待响应,能直接发送后面的消息。
发送异步消息需要提供一个方法,方法里面定义了onSuccess、onException两个方法,会有另外的线程来处理broker的响应。异步消息也可以设置重试次数。
3. 单向消息
生产者只管发送消息,对于broker是否收到不关心,不需要等待响应。比如日志收集。性能好,但是可能会丢消息
顺序消息
按照发送消息的顺序消费消息,例如创建订单、支付订单、完结订单。保证消息的顺序性需要在发送、存储、消费阶段都保证顺序性。
对于生产阶段来说,发送顺序消息需要保证两点:单一生产者和串行发送。
对于存储阶段,生产者指定将消息都发送到一个队列即可保证顺序性。又分为全局顺序消息和分区顺序消息。
对于消费阶段,消费者首先要保证单线程消费顺序消息,并且要考虑消费失败场景。可能需要支持相关联的消息都直接失败,然后在另外的地方持久化保存这些消息,待后续处理。
单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费。
RocketMQ利用三把锁来尽可能保证消息的消费顺序性:分布式锁、Synchronized、ReentrantLock
消费者会携带消费者组、客户端id和负责的队列信息向 Broker 申请队列锁,Broker 把对应的队列和消费者绑定并将这个关系存储到本地,别的消费者申请不到分布式锁就不能消费该队列了。分布式锁保证了同一个消费者组内,一个队列只会被分配给一个消费者。
Synchronized保证同一个时刻只有一个线程去消费这个队列。因为申请到分布式锁并拉去消息后是交给线程池并发消费的,因此需要Synchronized保证一个队列只会被一个线程消费。
真正开始消费消息之前,需要获取 ProcessQueue 的 consumeLock ,这个锁是 ReentrantLock。获取该锁失败表明有消息正在消费,获取成功则说明当前没有消息真正消费。ReentrantLock 比 Synchronized 粒度更小。
延迟消息
RocketMQ约定了一些延迟时间,生产者无法灵活的自定义延迟时间,而是固定的几个延迟时间(18个)来供生产者选择。这样延迟消息也有统一的归类和约束,便于管理和调配。默认分为 18 个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RocketMQ 专门有个 Topic 叫 SCHEDULE_TOPIC_XXXX 将所有延迟消息都放在该 Topic下,然后有个定时任务来扫描遍历消息的延迟时间,如果时间到了就把延迟消息发往它本身的 Topic 队列中。
不同延迟 level 的消息会存放在这个 Topic 不同的队列中,这个 Topic 一共有18个队列对应18个 level
Broker起了一个线程池,里面一共有18个核心线程,这个线程池的任务就是定时调度 SCHEDULE_TOPIC_XXXX 下每个队列的消息,一旦有消息到期就分发到原来的 topic 下供消费者消费。
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
事务消息
实现分布式事务的方案有很多,比如2PC, 3PC, TCC, 本地消息,事务消息
事务消息适合应用在异步更新,用来保证最终一致性
RocketMQ如何实现事务消息?
-
在事务开始时,我们就发送一条半消息给 Broker,所谓的半消息就是不完整的消息,这种消息不会被消费者消费。
-
执行本地事务
- 根据本地事务执行的结果来决定向 Broker 发送提交消息还是回滚消息。如果发送提交消息,半消息就完整了被标记为可投递就能被消费者消费了;如果发送回滚消息就不会被消费者消费。
- 此外,RocketMQ还设计了反查机制,Broker会向生产者来反查这个事务是否成功
如何使用事务消息?
需要实现 TransationListener 接口并重写两个方法
实现原理
发送半消息的时候发往的不是原先的 Topic,而是发往特定的 Topic:RMQ_SYS_TRANS_HALF_TOPIC 。
然后等待生产者的提交或者回滚事务的请求,如果是提交则从属性中获取原 Topic 将消息发往原 Topic 即往commitlog里面存储这条消息;如果是回滚则不往commitlog存储这条消息。
Broker还会起一个线程定时扫描 RMQ_SYS_TRANS_HALF_TOPIC 下的消息去生产者的反查接口看看事务是否成功。
消息消费
推消息
broker 收到消息,立即将消息推送给消费者
优点:实时性高
缺点:难以削峰填谷
适合消费者不多,消费量不大,及时性要求高的场景
拉消息
消费者主动去 broker 拉去消息
优点:消费者友好
缺点:消费可能不及时或消息忙请求(broker没消息消费者也会持续请求)
RockerMQ采用什么模式?
RocketMQ和kafka采取一种变种的拉模式:长轮询。消费者发送拉信息的请求到broker,此时如果有请求则直接响应返回消息,如果没消息就 hold 住这个请求,比如等15s,在15s内如果有消息过来就立马响应这个请求返回消息;如果没消息就返回无消息。
RocketMQ中有pullConsumer和 pushConsumer,实际都是基于长连接的长轮询去拉消息。
pushConsumer的实现是背后有一个线程会一直从Broker拉取消息,如果当时有过多的消息未消费那就过一会再执行,一旦有消息返回就回调用户自定义的 MessageListener来消费消息。
pullConsumer参数比较复杂,一般场景使用pushConsumer较多。
消费者启动、下线会触发重平衡,每个消费者也会有个定时任务,每20s主动重平衡下,防止消费任务不均衡。重平衡就是将队列均匀的交给消费者负责。
消息堆积时,增加消费者有用吗?不一定
如果消费者组的消费者数小于 topic 的队列数,此时可以缓解消息堆积;如果消费者数量比队列数多,则没用。
如何拉消息?
RocketMQ只用一个线程来执行拉消息的操作,所有拉消息的动作都会被封装成pullRequest,放到pullRequestQueue这个阻塞队列中,然后PullMessageService会不断从 pullRequestQueue 拉取 pullRequest,根据其内容构建请求去对应broker取消息。
根据pullRequest获取的结果会先缓存到ProcessQueue中,并且再构建一个消费任务 consumerRequest 给ConsumerMessageService 这个线程池来消费消息。线程池可以并发消费也能顺序消费,一般使用并发消费
ProcessQueue 起到了暂存消息的作用,这个数据可以用来流控。
消费者分类:
对比项 | PushConsumer | SimpleConsumer | PullConsumer |
---|---|---|---|
接口方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 | 业务方自行实现消息处理,并主动调用接口返回消费结果。 | 业务方自行按队列拉取消息,并可选择性地提交消费结果 |
消费并发度管理 | 由SDK管理消费并发度。 | 由业务方消费逻辑自行管理消费线程。 | 由业务方消费逻辑自行管理消费线程。 |
负载均衡粒度 | 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 | 消息粒度,更均衡 | 队列粒度,吞吐攒批性能更好,但容易不均衡 |
接口灵活度 | 高度封装,不够灵活。 | 原子接口,可灵活自定义。 | 原子接口,可灵活自定义。 |
适用场景 | 适用于无自定义流程的业务消息开发场景。 | 适用于需要高度自定义业务流程的业务开发场景。 | 仅推荐在流处理框架场景下集成使用 |
消息过滤
消费者去拉消息的时候,Broker只给消费者想要的消息,自动把不需要的那部分消息过滤掉了。过滤放在Broker而不是消费端可以减轻消费者的压力,避免大量无效消息投递给消费者。
过滤方式:
- Tag标签过滤:生产者构造消息的时候给消息打标记,每条消息仅允许设置一个 Tag 标签,然后消费者订阅主题时指定 Tag。
- SQL属性过滤:生产者可以给消息设置属性(键值对),消费者订阅时可以设置SQL过滤表达式来过滤多个属性。
对比项 | Tag标签过滤 | SQL属性过滤 |
---|---|---|
过滤目标 | 消息的Tag标签。 | 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。 |
过滤能力 | 精准匹配。 | SQL语法匹配。 |
适用场景 | 简单过滤场景、计算逻辑简单轻量。 | 复杂过滤场景、计算逻辑较复杂。 |
Tag 标签过滤:
生产者构建消息时设置了tag,实际上就是往消息属性设置了一个键值对,key的值是 ”TAGS“。所以SQL可以覆盖Tag过滤,本质上是通过属性判断的。
每条消息映射到ConsumeQueue的内容是 commitlog offset、size、tag hashcode。所以能利用tag hashcode快速判断消息的tag,不用去commitlog拉取消息进行解析,非常高效。
为什么不用tag字符判断?ConsumeQueue设计是定长的,而tag字符的长度不能控制,但是hash的长度是可以控制的。
hash冲突怎么解决?消费者拉取消息后还会自己通过tag字符判断一遍,确保过滤那些哈希冲突的消息。
SQL属性过滤:
本质上和tag一样,都是设置属性。差别在于SQL过滤需要从commitlog获取消息,解析其中的属性,然后做SQL匹配。
不匹配的消息被过滤,校验通过的消息被拉取到本地。不会存在哈希冲突,所以不会进行二次校验
如何保证消息不丢失?
生产阶段:利用请求确认机制保证消息发送成功
broker阶段:单个broker需要保证同步刷盘,消息存储到磁盘后再返回ack;集群还需要保证同步复制,这样主从切换才能保证消息是全的。
消费阶段:只有在对应消息的业务流程处理完毕后,再给broker返回消费确认,提交点位。因为消费者拉取消息后是将消息提交给线程池异步处理然后直接返回消费成功。
如何保证消息不重复?
生产者的确认机制无法保证消息不重复,broker过滤重复消息会加大负载,消费者通过提交点位和broker同步消费位置也无法保证不重复消费。
消息无法保证不重复,但是可以保证它被幂等消费来达到仅消费一次的效果。
如何保证消息幂等消费?
- 业务符合幂等
- 数据库唯一索引:利用消息已有的唯一字段或者添加一个唯一的事务id,数据库中给其添加唯一索引,第一次处理完消息后在数据库中存储这条消息的处理记录。重复消息的时候就会报错。
- redis唯一判断:利用 SETNX,也是利用全局唯一值标记消息。每次业务逻辑执行前先利用 SETNX判断,已经插入就直接返回,否则正常执行业务逻辑。存在问题:redis插入成功后消费者宕机,会使得这条消息被跳过。
如何处理消息堆积问题?
消息堆积的本质:消息生产速度大于消费速度
消息堆积原因 | 处理方式 |
---|---|
瞬时流量 | 降低生产消息的频率、限流降级,增加消费能力 |
消费性能问题 | 优化消费逻辑(批处理、缓存) |
机器不够 | 水平扩容 |
BUG导致 | 监控 |