MQ
什么时候会用到消息队列?
消息队列主要起到解耦上下游系统、数据缓存的作用。它不像数据库,会有很多计算、聚合、查询的逻辑,它的主要操作就是生产和消费。
一般情况下,我们会在需要解耦上下游系统、对数据有缓冲缓存需求或者需要用到消息队列的某些功能(比如延时消息、优先级消息)的时候选择使用消息队列,然后再根据实际需求选型。
下单流程是一个典型的 系统解耦、 消息分发 的场景,一份数据需要被多个下游系统处理。
另外一个经典场景就是日志采集流程,一般日志数据都很大,直接发到下游,下游系统可能会扛不住崩溃,所以会把数据先缓存到消息队列中。所以消息队列的基本特性就是高性能、高吞吐、低延时。
基本概念
架构层面的基本概念
-
Broker : Broker 本质上是一个进程,比如 RocketMQ 的 Broker 就是指RocketMQ Server 启动成功后的一个进程。在实际部署过程中,通常一个物理节点只会起一个进程,所以大部分情况下我们认为 Broker 就表示一个节点,但是在一些特殊场景下,一个物理节点中也可以起多个进程,就表示一台节点有多个Broker。
-
Topic(主题) : 在大部分消息队列中,Topic 都是指用来组织分区关系的一个逻辑概念。通常情况下,一个 Topic 会包含多个分区。但是 RabbitMQ 是一个例外,Topic 是指具体某一种主题模式。
-
Partition/Queue/MessageQueue(分区/分片): 在消息队列中,分区、分片、Partiton、Queue、MessageQueue 是一个概念,后面统一用分区来称呼,都是用来表示数据存储的最小单位。一般可以直接将消息写入到一个分区中,也可以将消息写入到Topic,再分发到具体某个分区。一个Topic 通常会包含一个或多个分区。
-
Producer(生产者): 生产者指消息的发送方,即发送消息的客户端,也叫生产端。
-
Consumer(消费者):消费者指消息的接收方,即接收消息的客户端,也叫消费端。
-
ConsumerGroup/Subscription(消费分组/订阅):一般情况下,消息队列中消费分组和订阅是同一个概念,后面统一用消费分组来称呼。它是用来组织消费者和分区关系的逻辑概念,也有保存消费进度的作用。
-
Message(消息):指一条真实的业务数据,消息队列的每条数据一般都叫做一条消息。
-
Offset/ConsumerOffset/Cursor(位点/消费位点/游标): 指消费者消费分区的进度,即每个消费者都会去消费分区,为了避免重复消费进度,都会保存消费者消费分区的进度信息。
-
ACK/OffsetCommit(确认/位点提交):确认和位点提交一般都是指提交消费进度的操作,即数据消费成功后,提交当前的消费位点,确保不重复消费。
-
Leader/Follower(领导者/追随者,主副本/从副本):Leader 和 Follower一般是分区维度副本的概念,即集群中的分区一般会有多个副本。此时就会有主从副本的概念,一般是一个主副本配上一个或多个从副本。
-
Segment(段/数据分段):段是指消息数据在底层具体存储时,分为多个文件存储时的文件,这个文件就叫做分区的数据段。即比如每超过 1G 的文件就新起一个文件来存储,这个文件就是Segment。基本所有的消息队列都有段的概念,比如Kakfa的Segment、Pulsar的Ledger等等。
-
StartOffset/EndOffset(起始位点/结束位点):起始位点和结束位点是分区维度的概念。即数据是顺序写入到分区的,一般从0的位置开始往后写,此时起始位点就是0。因为数据有过期的概念,分区维度较早的数据会被清理。此时起始位点就会往后移,表示当前阶段最早那条有效消息的位点。结束位点是指最新的那条数据的写入位置。因为数据一直在写入分区,所以起始位点和结束位点是一直动态变化的。
-
ACL(访问控制技术):ACL 全称是Access Control List,用来对集群中的资源进行权限控制,比如控制分区或Topic的读和写等。
功能层面的基本概念
-
顺序消息: 是指从生产者和消费者的视角来看,生产者按顺序写入Topic的消息,在消费者这边能不能按生产者写入的顺序消费到消息,如果能就是顺序消息。
-
延时消息/定时消息:都是指生产者发送消息到 Broker 时,可以设置这条消息在多久后能被消费到,当时间到了后,消息就会被消费到。延时的意思就是指以 Broker 收到消息的时间为准,多久后消息能被消费者消费,比如消息发送成功后的30分钟才能被消费。定时是指可以指定消息在设置的时间才能被看到,比如设置明天的20:00才能被消费。从技术上来看,两者是一样的;从客户端的角度,功能上稍微有细微的差别;从内核的角度,一般两种消息是以同一个概念出现的。
-
事务消息:消息队列的事务因为在不同的消息队列中的实现方式不一样,所以定义也不太一样。正常情况下,事务表示多个操作的原子性,即一批操作要么一起成功,要么一起失败。在消息队列中,一般指发送一批消息,要么同时成功,要么同时失败。
-
消息重试:消息重试分为生产者重试和消费者重试。生产者重试是指当消息发送失败后,可以设置重试逻辑,比如重试几次、多久后重试、重试间隔多少。消费者重试是指当消费的消息处理失败后,会自动重试消费消息。
-
消息回溯:是指当允许消息被多次消费,即某条消息消费成功后,这条消息不会被删除,还能再重复到这条消息。
-
广播消费:广播听起来是一个主动的,即 Broker 将一条消息广播发送给多个消费者。但是在消息队列中,广播本质上是指一条消息能不能被很多个消费者消费到。只要能被多个消费者消费到,就能起到广播消费的效果,就可以叫做广播消费。
-
死信队列:死信队列是一个功能,不是一个像分区一样的实体概念。它是指当某条消息无法处理成功时,则把这条消息写入到死信队列,将这条消息保存起来,从而可以处理后续的消息的功能。大部分情况下,死信队列在消费端使用得比较多,即消费到的消息无法处理成功,则将数据先保存到死信队列,然后可以继续处理其他消息。当然,在生产的时候也会有死信队列的概念,即某条消息无法写入Topic,则可以先写入到死信队列。从功能上来看,死信队列的功能业务也可以自己去实现。消息队列中死信队列的意思是,消息队列的SDK已经集成了这部分功能,从而让业务使用起来就很简单。
-
优先级队列:优先级队列是指可以给在一个分区或队列中的消息设置权重,权重大的消息能够被优先消费到。大部分情况下,消息队列的消息处理是FIFO先进先出的规则。此时如果某些消息需要被优先处理,基于这个规则就无法实现。所以就有了优先级队列的概念,优先级是消息维度设置的。
-
消息过滤:是指可以给每条消息打上标签,在消费的时候可以根据标签信息去消费消息。可以理解为一个简单的查询消息的功能,即通过标签去查询过滤消息。消息过滤主要在消费端生效。
-
消息过期/删除(TTL):是指消息队列中的消息会在一定时间或者超过一定大小后会被删除。因为消息队列主要是缓冲作用,所以一般会要求消息在一定的策略后会自动被清理。
-
消息轨迹:是指记录一条消息从生产端发送、服务端保存、消费端消费的全生命周期的流程信息。用来追溯消息什么时候被发送、是否发送成功、什么时候发送成功、服务端是否保存成功、什么时候保存成功、被哪些消费者消费、是否消费成功、什么时候被消费等等信息。
-
消息查询:是指能够根据某些信息查询到消息队列中的信息。比如根据消息ID或根据消费位点来查询消息,可以理解为数据库里面的固定条件的select操作。
-
消息压缩:是指生产端发送消息的时候,是否支持将消息进行压缩,以节省物理资源(比如网卡、硬盘)。压缩可以在SDK完成,也可以在Broker完成,并没有严格限制。通常来看,压缩在客户端完成会比较合理。
-
多租户:是指同一个集群是否有逻辑隔离,比如一个物理集群能否创建两个名称都为test的主题。此时一般会有一个逻辑概念 Namespace(命名空间)和 Tenant(租户)来做隔离,一般有这两个概念的就是支持多租户。
-
消息持久化:是指消息发送到Broker后,会不会持久化存储,比如存储到硬盘。有些消息队列为了保证性能,只会把消息存储在内存,此时节点重启后数据就会丢失。
-
消息流控:是指能否对写入集群的消息进行限制。一般会支持Topic、分区、消费分组、集群等维度的限流。
通信协议:如何设计一个好的通信协议?
消息队列的核心特性是高吞吐、低延时、高可靠,所以在协议上至少需要满足:
- 协议可靠性要高,不能丢数据。
- 协议的性能要高,通信的延时要低。
- 协议的内容要精简,带宽的利用率要高。
- 协议需要具备可扩展能力,方便功能的增减。
目前业界的通信协议可以分为 公有协议和私有协议 两种。公有协议指公开的受到认可的具有规范的协议,比如JMS、HTTP、STOMP等。私有协议是指根据自身的功能和需求设计的协议,一般不具备通用性,比如Kafka、RocketMQ、Puslar的协议都是私有协议。
其实消息队列领域是存在公有的、可直接使用的标准协议的,比如AMQP、MQTT、OpenMessaging,它们设计的初衷就是为了解决因各个消息队列的协议不一样导致的组件互通、用户使用成本高、重复设计、重复开发成本等问题。但是,公有的标准协议讨论制定需要较长时间,往往无法及时赶上需求的变化,灵活性不足。
因此大多数消息队列为了自身的功能支持、迭代速度、灵活性考虑,在核心通信协议的选择上不会选择公有协议,都会选择自定义私有协议。
那私有协议要怎么设计实现呢?
从技术上来看,私有协议设计一般需要包含三个步骤。
- 网络通信协议选型,指计算机七层网络模型中的协议选择。比如传输层的TCP/UDP、应用层的HTTP/WebSocket等。
- 应用通信协议设计,指如何约定客户端和服务端之间的通信规则。比如如何识别请求内容、如何确定请求字段信息等。
- 编解码(序列化/反序列化)实现,用于将二进制的消息内容解析为程序可识别的数据格式。
网络通信协议选型
从功能需求出发,为了保证性能和可靠性,几乎所有主流消息队列在核心生产、消费链路的协议选择上, 都是基于可靠性高、长连接的TCP协议。
四层的UDP虽然也是长连接,性能更高,但是因为其不可靠传输的特性,业界几乎没有消息队列用它通信。
七层的HTTP协议每次通信都需要经历三次握手、四次关闭等步骤,并且协议结构也不够精简。从而在性能(比如耗时)上的表现较差,不适合高吞吐、大流量、低延时的场景。所以主流协议在核心链路上很少使用HTTP。
应用通信协议设计
从应用通信协议构成的角度,协议一般会包含协议头和协议体两部分。
- 协议头 包含一些通用信息和数据源信息,比如协议版本、请求标识、请求的ID、客户端ID等等。
- 协议体 主要包含本次通信的业务数据,比如一串字符串、一段JSON格式的数据或者原始二进制数据等等。
最核心的就是要遵循“极简”原则,在满足业务要求的基础上,尽量压缩协议的大小。
编解码实现
编解码也称为序列化和反序列,就是数据发送的时候编码,收到数据的时候解码。
在序列化和反序列化中,最重要的就是TCP的粘包和拆包。我们知道TCP是一个“流”协议,是一串数据,没有明显的界限,TCP层面不知道这段流数据的意义,只负责传输。所以应用层就要根据某个规则从流数据中拆出完整的包,解析出有意义的数据,这就是粘包和拆包的作用。
粘包/拆包的几个思路就是:
- 消息定长。
- 在包尾增加回车换行符进行分割,例如FTP协议。
- 将消息分为消息头和消息体,消息头中包含消息总长度,然后根据长度从流中解析出数据。
- 更加复杂的应用层协议,比如HTTP、WebSocket等。
网络:如何设计高性能的网络模块?
消息队列是需要满足高吞吐、高可靠、低延时,并支持多语言访问的基础软件,网络模块最需要解决的是 性能、 稳定性、开发成本 三个问题。
性能瓶颈分析
对于 单个请求 来说,请求流程是:客户端(生产者/消费者)构建请求后,向服务端发送请求包 -> 服务端接收包后,将包交给业务线程处理 -> 业务线程处理完成后,将结果返回给客户端。其中可能消耗性能的有三个点。
- 编解码的速度。上节课我们详细讲过。
- 网络延迟。也就是客户端到服务端的网络延迟,这一点在软件层面几乎无法优化,取决于网络链路的性能,跟网络模块无关。
- 服务端/客户端网络模块的处理速度。发送/接收请求包后,包是否能及时被处理,比如当逻辑线程处理完成后,网络模块是否及时回包。
对于 并发请求 来说,在单个请求维度的问题的基础上,还需要处理高并发、高QPS、高流量等场景带来的性能问题。主要包含三个方面。
- 高效的连接管理:当客户端和服务端之间的TCP连接数很多,如何高效处理、管理连接。
- 快速处理高并发请求:当客户端和服务端之间的QPS很高,如何快速处理(接收、返回)请求。
- 大流量场景:当客户端和服务端之间的流量很高,如何快速吞吐(读、写)数据。
大流量场景,某种意义上是高并发处理的一种子场景。因为大流量分为单个请求包大并发小、单个请求包小并发大两种场景。第一种的瓶颈主要在于数据拷贝、垃圾回收、CPU占用等方面,主要依赖语言层面的编码技巧来解决,一般问题不大。第二种场景是我们需要主要解决的。
高性能网络模块的设计实现
基于多路复用技术管理 TCP 连接:
从技术原理来看,高效处理大量TCP连接,在消息队列中主要有单条TCP连接的复用和多路复用两种技术思路。
1. 单条TCP连接的复用:RabbitMQ用的就是这种方案
2. IO多路复用技术:主流的消息队列Kakfa、RocketMQ、Pulsar的网络模块都是基于IO多路复用的思路开发的。
基于Reactor模型处理高并发请求:
Reactor 模型是一种处理并发服务请求的事件设计模式,当主流程收到请求后,通过多路分离处理的方式,把请求分发给相应的请求处理器处理。Reactor 模式包含Reactor、Acceptor、Handler三个角色。
- Reactor:负责监听和分配事件。收到事件后分派给对应的 Handler处理,事件包括连接建立就绪、读就绪、写就绪等。
- Acceptor:负责处理客户端新连接。Reactor 接收到客户端的连接事件后,会转发给 Acceptor,Acceptor接收客户端的连接,然后创建对应的Handler,并向Reactor注册此 Handler。
- Handler:请求处理器,负责业务逻辑的处理,即业务处理线程。
从技术上看,Reactor模型一般有三种实现模式。
- 单 Reactor 单线程模型(单 Reactor 单线程)
- 单 Reactor 多线程模型 (单 Reactor 多线程)
- 主从 Reactor 多线程模型 (多 Reactor 多线程):当前业界消息队列的网络模型,比如Pulsar、Kafka、RocketMQ,为了保证性能,都是基于主从 Reactor 多线程模型开发的。
主流消息队列的网络模型实现
Kafka 网络模型
Kafka的网络层没有用Netty作为底层的通信库,而是直接采用Java NIO实现网络通信。在网络模型中,也是参照Reactor多线程模型,采用多线程、多Selector的设计。
Processor线程和Handler线程之间通过RequestChannel传递数据,RequestChannel中包含一个RequestQueue队列和多个ResponseQueues队列。每个Processor线程对应一个ResponseQueue。
具体流程上:
- 一个Acceptor接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。
- Processor线程把读取到的请求存入RequestQueue中,Handler线程从RequestQueue队列中取出请求进行处理。
- Handler线程处理请求产生的响应,会存放到Processor对应的ResponseQueue中,Processor 线程从其对应的ResponseQueue中取出响应信息,并返回给客户端。
RocketMQ 网络模型
RocketMQ 采用Netty组件作为底层通信库,遵循Reactor多线程模型,同时又在Reactor模型上做了一些扩展和优化。所以它的网络模型是Netty的网络模型,Netty底层采用的是主从Reactor多线程模型,模型的原理逻辑跟前面讲到的主从Reactor多线程模型是一样的。
具体流程上:
- 一个 Reactor 主线程负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到Selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置,监听真正的网络数据。
- 接收到网络数据后,会把数据传递给Reactor线程池处理。
- 真正执行业务逻辑之前,会进行SSL验证、编解码、空闲检查、网络连接管理,这些工作在Worker线程池处理(defaultEventExecutorGroup)。
- 处理业务操作,放在业务Processor线程池中执行。
存储:消息数据和元数据的存储是如何设计的?
消息队列中的数据一般分为 元数据和消息数据。元数据是指Topic、Group、User、ACL、Config等集群维度的资源数据信息,消息数据指客户端写入的用户的业务数据。
元数据信息的存储
元数据信息的特点是数据量比较小,不会经常读写,但是需要保证数据的强一致和高可靠,不允许出现数据的丢失。同时,元数据信息一般需要通知到所有的Broker节点,Broker会根据元数据信息执行具体的逻辑。比如创建Topic并生成元数据后,就需要通知对应的Broker执行创建分区、创建目录等操作。
所以元数据信息的存储,一般有两个思路。
- 基于第三方组件来实现元数据的存储。
- 在集群内部实现元数据的存储。
基于第三方组件来实现元数据的存储是目前业界的主流选择。 比如Kafka ZooKeeper版本、Pulsar、RocketMQ 用的就是这个思路,其中Kakfa和Pulsar的元数据存储在ZooKeeper中,RocketMQ存储在NameServer中。
另一种思路, 集群内部实现元数据的存储是指在集群内部完成元数据的存储和分发。 也就是在集群内部实现类似第三方组件一样的元数据服务,比如基于Raft协议实现内部的元数据存储模块或依赖一些内置的数据库。目前Kafka 去ZooKeeper的版本、RabbitMQ的Mnesia、Kafka的C++版本RedPanda用的就是这个思路。
消息数据的存储
数据存储结构设计
在消息队列中,跟存储有关的主要是Topic和分区两个维度。用户可以将数据写入Topic或直接写入到分区。
不过如果写入Topic,数据也是分发到多个分区去存储的。所以从实际数据存储的角度来看, Topic和Group不承担数据存储功能,承担的是逻辑组织的功能,实际的数据存储是在在分区维度完成的。
从技术架构的角度,数据的落盘存储也有两个思路。
- 每个分区单独一个存储“文件”。
- 每个节点上所有分区的数据都存储在同一个“文件”。
第一个思路,每个分区对应一个文件的形式去存储数据。具体实现时,每个分区上的数据顺序写到同一个磁盘文件中,数据的存储是连续的。因为消息队列在大部分情况下的读写是有序的,所以 这种机制在读写性能上的表现是最高的。
但如果分区太多,会占用太多的系统FD资源,极端情况下有可能把节点的FD资源耗完,并且硬盘层面会出现大量的随机写情况,导致写入的性能下降很多,另外管理起来也相对复杂。Kafka在存储数据的组织上用的就是这个思路。
第二种思路,每个节点上所有分区的数据都存储在同一个文件中,这种方案需要为每个分区维护一个对应的索引文件,索引文件里会记录每条消息在File里面的位置信息,以便快速定位到具体的消息内容。
因为 所有文件都在一份文件上,管理简单,也不会占用过多的系统FD资源,单机上的数据写入都是顺序的,写入的性能会很高。缺点是同一个分区的数据一般会在文件中的不同位置,或者不同的文件段中,无法利用到顺序读的优势,读取的性能会受到影响,但是随着SSD技术的发展,随机读写的性能也越来越高。如果使用SSD或高性能SSD,一定程度上可以缓解随机读写的性能损耗,但SSD的成本比机械硬盘高很多。
目前RocketMQ、RabbitMQ和Pulsar的底层存储BookKeeper用的就是这个方案。
那怎么选择呢? 核心考虑是你对读和写的性能要求。
- 第一种方案,单个文件读和写都是顺序的,性能最高。但是当文件很多且都有读写的场景下,硬盘层面就会退化为随机读写,性能会严重下降。
- 第二种方案,因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高。但是在消费的时候,因为多个分区数据存储在同一个文件中,同一个分区的数据在底层存储上是不连续的,硬盘层面会出现随机读的情况,导致读取的性能降低。
消息数据的分段实现
从技术上来看,当数据段到达了规定的大小后,就会新创建一个新文件来保存数据。
如果进行了分段,消息数据可能分布在不同的文件中。所以我们在读取数据的时候,就需要先定位消息数据在哪个文件中。为了满足这个需求,技术上一般有 根据偏移量定位或根据索引定位 两种思路。
根据偏移量(Offset)来定位消息在哪个分段文件中,是指通过记录每个数据段文件的起始偏移量、中止偏移量、消息的偏移量信息,来快速定位消息在哪个文件。
如果用索引定位,会直接存储消息对应的文件信息,而不是通过偏移量来定位到具体文件。
消息数据存储格式
消息数据存储格式一般包含消息写入文件的格式和消息内容的格式两个方面。
消息写入文件的格式指消息是以什么格式写入到文件中的,比如JSON字符串或二进制。从性能和空间冗余的角度来看,消息队列中的数据基本都是以二进制的格式写入到文件的。
消息内容的格式是指写入到文件中的数据都包含哪些信息。
消息数据的存储格式一般包含通用信息和业务信息两部分。通用信息主要包括时间戳、CRC、消息头、消息体、偏移量、长度、大小等信息,业务信息主要跟业务相关,包含事务、幂等、系统标记、数据来源、数据目标等信息。
消息数据清理机制
消息队列中的数据最终都会删除,时间周期短的话几小时、甚至几分钟,正常情况一天、三天、七天,长的话可能一个月,基本很少有场景需要在消息队列中存储一年的数据。
消息队列的数据过期机制一般有手动删除和自动删除两种形式,从实现上看主要有三种思路。
- 消费完成执行ACK删除数据
- 根据时间和保留大小删除
- ACK机制和过期机制相结合
消费完成执行ACK删除数据,技术上的实现思路一般是 : 当客户端成功消费数据后,回调服务端的ACK接口,告诉服务端数据已经消费成功,服务端就会标记删除该行数据,以确保消息不会被重复消费。ACK的请求一般会有单条消息ACK和批量消息ACK两种形式。优点是不会出现重复消费,一条消息只会被消费一次。缺点是ACK成功后消息被删除,无法满足需要消息重放的场景。
根据时间和保留大小删除指消息在被消费后不会被删除,只会通过提交消费位点的形式标记消费进度。实现思路一般是服务端提供偏移量提交的接口,当客户端消费成功数据后,客户端会回调偏移量提交接口,告诉服务端这个偏移量的数据已经消费成功了,让服务端把偏移量记录起来。然后服务端会根据消息保留的策略,比如保留时间或保留大小来清理数据。一般通过一个常驻的异步线程来清理数据。优点是消息可以多次重放,适用于需要多次进行重放的场景。缺点是在某些情况下(比如客户端使用不当)会出现大量的重复消费。
RabbitMQ选择的是第一个方案,Kafka和RocketMQ选择的是第二种方案,Pulsar选择的是第三种方案。
消息数据是顺序存储在文件中的,会有很多分段数据,一个文件可能会有很多行数据。那么在ACK或者数据删除的时候,一个文件中可能既存在可删除数据,也存在不可删除数据。如果我们每次都立即删除数据,需要不断执行“读取文件、找到记录、删除记录、写入文件”的过程,即使批量操作,降低频率,还是得不断地重复这个过程,会导致性能明显下降。
当前主流的思路都是 延时删除,以段数据为单位清理,降低频繁修改文件内容和频繁随机读写文件的操作。
存储:如何提升存储模块的性能和可靠性?
提升写入操作的性能
写入性能的提高主要有缓存写、批量写、顺序写三个思路。
1. 缓存写和批量写
写入优化的主要思路之一是: 将数据写入到速度更快的内存中,等积攒了一批数据,再批量刷到硬盘中。
平时我们在一些技术文章看到的“数据先写入PageCache,再批量刷到硬盘”,说的就是这个思路。PageCache指操作系统的页缓存,简单理解就是内存,通过缓存读写数据可以避免直接对硬盘进行操作,从而提高性能。
把缓存数据刷回到硬盘,一般有“按照空间占用比例”、“时间周期扫描”和“手动强制刷新”三种策略。
消息队列一般会同时提供:是否同步刷盘、刷盘的时间周期、刷盘的空间比例三个配置项,让业务根据需要调整自己的刷新策略。从性能的角度看,异步刷新肯定是性能最高的,同步刷新是可靠性最高的。
2. 随机写和顺序写
实现随机写和顺序写的核心就是 数据存储结构的设计。
数据存储结构设计有两个思路:每个Partition/Queue单独一个存储文件,每台节点上所有Partition/Queue的数据都存储在同一个文件。
第一种方案,对单个文件来说读和写都是顺序的,性能最高,但当文件很多且都有读写,在硬盘层面就会退化为随机读写,性能会下降很多。第二种方案,因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高。所以为了提高写的性能,我们最好使用第二种方案。
提升写入操作的可靠性
为了提高数据可靠性,在消息队列的存储模块中,一般会通过三种处理手段:同步刷盘、WAL预写日志、多副本备份,进一步提升数据的可靠性。
1. 同步刷盘
同步刷盘指每条数据都同步刷盘,等于回到了直接写硬盘的逻辑,一般通过写入数据后调用force()操作来完成数据刷盘。这种方案无法利用内存写入速度的优势,效率会降低很多。
2. WAL
WAL(预写日志)指在写数据之前先写日志,当出现数据丢失时通过日志来恢复数据,避免数据丢失。
在实际落地中,我们可以采取WAL日志盘和实际数据盘分离的策略,提升WAL日志的写入速度。具体就是让WAL数据盘是高性能、低容量的数据盘,数据盘是性能较低、容量较大的数据盘,如果出现数据异常,就通过WAL日志进行数据恢复。这样,给WAL日志选择合适的设备,再加上并行读写等代码优化手段,性能损失就可控了,甚至可以忽略。
3. 多副本的备份
多副本的备份就是将数据拷贝到多台节点,每台节点都写入到内存中,从而完成数据的可靠性存储。因为单机层面也是把数据写入到内存中就记录写入成功,单机层面也可能出现数据丢失,所以核心思路是同时在多台节点中缓存数据,只要不是多台节点同时重启,数据就可以恢复。
提升读取操作的性能
1. 冷读和热读
热读是指消息数据本身还在缓存中,读取数据是从内存中获取,此时性能最高,不需要经过硬盘。冷读是指消息数据刷到硬盘中了,并且数据已经被换页换出缓存了,此时读取数据需要从硬盘读取。
2. 顺序读、随机读、批量读
3. 零拷贝原理和使用方式
为了解决复制次数带来的性能损耗,“零拷贝”这个概念就被提出来了。 主要思路是通过减少数据复制次数、减少上下文(内核态和用户态)切换次数、通过DMA(直接内存)代替 CPU 完成数据读写,来解决复制和资源损耗的问题。
零拷贝主要用于在消费的时候提升性能,具体有两种实现方式: mmap+write 和 sendfile。
生产端:生产者客户端的SDK有哪些设计要点?
消息队列的客户端主要包含生产、消费、集群管控三类功能。
从客户端SDK实现的角度来看,生产模块包含 客户端基础功能和生产相关功能 两部分,基础功能包括请求连接管理、心跳检测、内容构建、序列化、重试、容错处理等等。生产功能包括客户端寻址、分区选择、批量发送,生产错误处理、SSL、压缩、事务、幂等等等。
基础功能
连接管理
在网络模块,我们讲过客户端和服务端之间基本都是通过各自语言的网络库,创建TCP长连接进行通信的。在大部分实现中,为了避免连接数膨胀,每个客户端实例和每台Broker只会维护一条TCP连接。
建立一条TCP连接很简单,更关键的是,什么情况下建立连接?一般有初始化创建连接和使用时创建链接两种方式。
- 初始化创建连接,指在实例初始化时就创建到各个Broker的TCP连接,等待数据发送。好处是提前创建好可以避免发送的时候冷启动。缺点是需要提前创建好所有的连接,可能导致连接空跑,会消耗一定的资源。
- 使用时创建链接,指在实例初始化时不建立连接,当需要发送数据时再建立。好处是发送时才建立,连接的使用率会较高。缺点是可能出现连接冷启动,会增加一点本次请求的耗时。
因为客户端会有空闲连接回收机制,创建连接的耗时一般较短,所以在实际的架构实现中,两种方式都会用,优劣区别并不明显。不过,从资源利用率的角度考虑, 建议使用晚建立连接的方式。
心跳检测
心跳检测是客户端和服务端之间保活的一种机制,检测服务端或者客户端的一方不可用时,另一方可以及时回收资源,避免资源浪费。一般通过 ping-pong 的方式来发起探测。
客户端和服务端之间的心跳检测机制的实现,一般有基于 TCP 的 KeepAlive 保活机制和应用层主动探测两种形式。
-
基于TCP的KeepAlive保活机制 是TCP/IP 协议层内置的功能,需要手动打开TCP的KeepAlive功能。通过这种方案实现心跳探测,优点是简单,缺点是KeepAlive实现是在服务器侧,需要Server主动发出检测包,此时如果客户端异常,可能出现很多不可用的TCP连接。这种连接会占用服务器内存资源,导致服务端的性能下降。
-
应用层主动探测 一般是Client向Server发起的,主要解决灵活性和TCP KeepAlive的缺陷。探测流程一般是客户端定时发送保活心跳,当服务端连续几次没收到请求,就断开连接。这样做的好处是,可以将压力分担到各个客户端,避免服务端的过载。
错误处理
从请求的角度,有些错误是重试可以恢复的,比如连接断开、Leader切换、发送偶尔超时、服务端某些异常等;有些错误是不可恢复的,比如Topic/分区不存在、服务端Broker不存在、集群和Broker长时间无响应等。
所以,在客户端的处理中也会将错误分为可重试错误和不可重试错误两类。
重试机制
重试策略一般会支持重试次数和退避时间的概念。当消息失败,超过设置的退避时间后,会继续重试,当超过重试次数后,就会抛弃消息或者将消息投递到配置好的重试队列中。
退避时间是可以配置的,比如1s、10s、1分钟。当出现错误时,就会根据退避策略退避,再尝试写入。一般情况下,重试是有次数上限的,当然也支持配置无限重试。
生成功能
客户端寻址机制
1. Metadata(元数据)寻址机制
服务端会提供一个获取全量的 Metadata 的接口,客户端在启动时,首先通过接口拿到集群所有的元数据信息,本地缓存这部分数据信息。然后,客户端发送数据的时候,会根据元数据信息的内容,得到服务端的地址是什么,要发送的分区在哪台节点上。最后根据这两部分信息,将数据发送到服务端。
消息队列的元数据是指 Topic、分区、Group、节点、配置等集群维度的信息。比如Topic有几个分区,分区的 Leader 和 Follower 在哪些节点上,节点的 IP 和端口是什么,有哪些Group等等。
客户端一般通过 定期全量更新Metadata信息和请求报错时更新元数据信息 两种方式,来保证客户端的元数据信息是最新的。目前Kafka、RocketMQ、Pulsar用的都是这个方案。
2. 服务端内部转发机制
另外一种服务端内部转发机制,客户端不需要经过寻址的过程,写入的时候是随机把数据写入到服务端任意一台Broker。
具体思路是服务端的每一台Broker会缓存所有节点的元数据信息,生产者将数据发送给Broker后,Broker如果判断分区不在当前节点上,会找到这个分区在哪个节点上,然后把数据转发到目标节点。
这个方案的好处是分区寻址在服务端完成,客户端的实现成本比较低。但是生产流程多了一跳,耗时增加了。另外服务端因为转发多了一跳,会导致服务端的资源损耗多一倍,比如CPU、内存、网卡,在大流量的场景下,这种损耗会导致集群负载变高,从而导致集群性能降低。
所以这种方案不适合大流量、高吞吐的消息队列。目前业界只有 RabbitMQ 使用这个方案。
生产分区分配策略
数据可以直接写入分区或者写入Topic。写入Topic时,最终数据还是要写入到某个分区。这个数据选择写入到哪个分区的过程,就是生产数据的分区分配过程。过程中的分配策略就是生产分区分配策略。
一般情况下,消息队列默认支持轮询、按Key Hash、手动指定、自定义分区分配策略四种分区分配策略。
-
轮询 是所有消息队列的默认选项。消息通过轮询的方式依次写入到各个分区中,这样可以保证每个分区的数据量是一样的,不会出现分区数据倾斜。
-
按Key Hash 是指根据消息的Key 算出一个Hash值,然后跟Topic的分区数取余数,算出一个分区号,将数据写入到这个分区中。好处是可以根据Key来保证数据的分区有序。缺点是分区数量不能变化,变化后Hash值就会变,导致消息乱序。并且因为每个Key的数据量不一样,容易导致数据倾斜。
-
手动指定 很简单,就是在生产数据的时候,手动指定数据写入哪个分区。这种方案的好处就是灵活,用户可以在代码逻辑中根据自己的需要,选择合适的分区,缺点就是业务需要感知分区的数量和变化,代码实现相对复杂。
除了这3种默认策略,消息队列也支持 自定义分区分配策略,让用户灵活使用。内核提供 Interface(接口)机制,用户如果需要指定自定义的分区分配策略,可以实现对应的接口,然后配置分区分配策略。比如Kafka可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口实现自定义分区策略。
批量语义
客户端支持批量写入数据的前提是,需要在协议层支持批量的语义。否则就只能在业务中自定义将多条消息组成一条消息。
批量发送的实现思路一般是在客户端内存中维护一个队列,数据写入的时候,先将其写到这个内存队列,然后通过某个策略从内存队列读取数据,发送到服务端。
批量发送数据的策略和存储模块的刷盘策略很像,都是根据数据条数或时间聚合后,汇总发送到服务端,一般是满足时间或者条数的条件后触发发送操作,也会有立即发送的配置项。
数据发送方式
消息队列一般也会提供同步发送、异步发送、发送即忘三种形式。
同步和异步更多是语言语法的实现,同步发送主要解决数据发送的即时性和顺序性,异步发送主要考虑性能。发送即忘指消息发送后不关心请求返回的结果,立即发送下一条。
消费端:消费者客户端的SDK有哪些设计要点?
消费端SDK和生产端SDK一样,主要包括客户端基础功能和消费相关功能两部分。
消费相关功能包括 消费模型、 分区消费模式、 消费分组(订阅)、 消费确认、 消费失败处理 五个部分。
消费模型的选择
为了满足不同场景的业务需求,从实现机制上来看,主流消息队列一般支持 Pull、Push、Pop 三种消费模型。
Pull 模型
Pull(拉)模型是指客户端通过不断轮询的方式向服务端拉取数据。它是消息队列中使用最广泛和最基本的模型,主流的消息队列都支持这个模型。
优点是客户端根据自身的处理速度去拉取数据,不会对客户端和服务端造成额外的风险和负载压力。缺点是可能会出现大量无效返回的Pull调用,从而浪费通信资源,提高服务端的负载,另外消费及时性不够,无法满足一些需要全链路低耗时的场景。
为了解决这个问题,正常的思路是在客户端根据一定策略进行等待和回避。这样做的话,就会出现如何设置等待时间的问题,客户端等待时间设置不合理就会出现消费不及时的情况。
所以为了解决空请求带来的问题,一般服务端会协助处理,有如下两个思路。
1. 服务端hold住请求
当客户端根据策略拉取数据时,如果没有足够的数据,就先在服务端等一段时间,等有数据后一起返回给客户端。这种方案的好处是,可以尽量提高吞吐能力,不会有太多的空交互请求。缺点是如果长时间不给客户端回包,会导致客户端请求超时,另外当数据不够时,hold住请求的时间太长就会提高消费延时。
2. 服务端有数据的时候通知客户端
当服务端不hold住请求,立刻返回空数据,客户端收到空数据时则不再发起请求,会等待服务端的通知。当服务端有数据的时候,再主动通知客户端来拉取。这种方案的好处是可以及时通知客户端来拉取数据,从而降低消费延时。缺点是因为客户端和服务端一般是半双工的通信,此时服务端是不能主动向客户端发送消息的。
这种策略可以解决频繁不可控的空轮询请求。即使全是空轮询,对单个消费者来说,其TPS也是可以预估的,即总时间/等待时长 = 总轮询次数。而如果需要降低消费延时,可以通过降低最小获取的数据大小和最大等待时长来提高获取的频率,从而尽量降低延时。通过这种方案,我们可以把理想的消费延迟时间降低到两次Pull请求之间的时间间隔。
Push 模型
Push(推)模型是为了解决消费及时性而提出来的。这个模型的本意是指当服务端有数据时会主动推给客户端,让数据的消费更加及时。当服务端有数据以后,会主动推动给各个消费者。
在实际的Push模型的实现上,一般有 Broker 内置 Push 功能、Broker 外独立实现 Push 功能的组件、在客户端实现伪 Push 功能三种思路。
第一种, Broker 内置 Push 功能是指在Broker中内置标准的 Push 的能力,由服务端向客户端主动推送数据。
这种方案的好处是Broker自带Push能力,无需重复开发和部署。Broker 内部可以感知到数据堆积情况,可以保证消息被及时消费。缺点是当消费者很多时,内核需要主动维护很多与第三方的长连接,并且需要处理各种客户端异常,比如客户端卡住、接收慢、处理慢等情况。这些推送数据、异常处理、连接维护等工作需要消耗很多的系统资源,在性能上容易对Broker形成反压,导致Broker本身的性能和稳定性出现问题。
所以这种方案在主流消息队列中用得较少,比如RabbitMQ和某些金融证券领域的消息队列,为了保证消息投递的高效及时(比如全链路的毫秒级耗时),才会采用这种方案。
第二种,Broker外独立实现Push功能的组件是指独立于Broker提供一个专门实现推模型的组件。 通过先 Pull 数据,再将数据 Push 给客户端,从而简化客户端的使用,提高数据消费的及时性。
这种方案的好处是将Push组件独立部署,解决了 Broker 的性能和稳定性问题,也能实现Push的效果。缺点是虽然实现了Push的模型,但其本质还是先Pull再Push,从全链路来看,还是会存在延时较高的问题,并且需要单独开发独立的 Push 组件,开发和运维成本较高。
从实际业务上来讲,这种模型的使用场景较为有限,主要用在回调、事件触发的场景,在实际的流消费场景中用得较少。主要是因为通过第三方组件的Push灵活性不够,性能会比Pull低。
第三种,在客户端实现伪Push功能是指在客户端内部维护内存队列,SDK 底层通过Pull模型从服务端拉取数据存储到客户端的内存队列中。 然后通过回调的方式,触发用户设置的回调函数,将数据推送给应用程序,在使用体验上看就是 Push 的效果。
这种方案的好处在于通过客户端底层的封装,从用户体验看是Push模型的效果,解决用户代码层面的不断轮询问题,降低了用户的使用复杂度。缺点是底层依旧是Pull模型,还是得通过不断轮询的方式去服务端拉取数据,就会遇到 Pull 模型遇到的问题。
在客户端实现伪Push,是目前消息队列在实现Push模型上常用的实现方案,因为它解决了客户体验上的主动回调触发消费问题。虽然底层会有不断轮询和消费延时的缺点,但是可以通过编码技巧来降低这两个问题的影响。
因为Push模型需要先分配分区和消费者的关系,客户端就需要感知分区分配、分区均衡等操作,从而在客户端就需要实现比较重的逻辑。并且当客户端和订阅的分区数较多时,容易出现需要很长的重平衡时间的情况。
Pop模型
Pop模型想解决的是客户端实现较重,重平衡会暂停消费并且可能时间较长,从而出现消费倾斜的问题。
它的思路是客户端不需要感知到分区,直接通过Pop模型提供的get接口去获取到数据,消费成功后ACK数据。就跟我们发起HTTP请求去服务端拉取数据一样,不感知服务端的数据分布情况,只需要拉到数据。这种方案的好处是简化了消费模型,同时服务端可以感知到消费的堆积情况,可以根据堆积情况返回那些分区的数据给客户端,这样也简化了消息数据的分配策略。
从实现上来看,它将分区分配的工作移到了服务端,在服务端完成了消费者的分区分配、进度管理,然后暴露出了新的Pop和ACK接口。客户端调用Pop接口去拿取数据,消费成功后调用ACK去确认数据。这和 HTTP 的 Request 和 Response 的使用模型一致。
分区消费模式的设计
在数据的消费模式上主要有独占消费、共享消费、广播消费、灾备消费四个思路。
独占消费是指一个分区在同一个时间只能被一个消费者消费。 在消费者启动时,会分配消费者和分区之间的消费关系。当消费者数量和分区数量都没有变化的情况下,两者之间的分配关系不会变动。当分配关系变动时,一个分组也只能被一个消费者消费,这个消费者可能是当前的,也可能是新的。如果消费者数量大于分区数量,则会有消费者被空置;反之,如果分区数量大于消费者数量,一个消费者则可以同时消费多个分区。
独占消费的好处是可以保证分区维度的消费是有序的。缺点是当数据出现倾斜、单个消费者出现性能问题或hang住时,会导致有些分区堆积严重。现在大部分消息队列默认支持的就是独占消费的类型,比如Kafka、RocketMQ、Pulsar等。
共享消费是指单个分区的数据可以同时被多个消费者消费。 即分区的数据会依次投递给不同的消费者,一条数据只会投递给一个消费者。
这种方式的好处是,可以避免单个消费者的性能和稳定性问题导致分区的数据堆积。缺点是无法保证数据的顺序消费。这种模式一般用在对数据的有序性无要求的场景,比如日志。
广播消费是指一条数据要能够被多个消费者消费到。 即分区中的一条数据可以投递给所有的消费者,这种方式是需要广播消费的场景。
实现广播消费一般有内核实现广播消费的模型、使用不同的消费分组消费和指定分区消费三种技术思路。
- 内核实现广播消费的模型,指在Broker内核中的消息投递流程实现广播消费模式,即 Broker 投递消息时,可以将一条消息吐给不同的消费者,从而实现广播消费。
- 使用不同的消费分组对数据进行消费,指通过创建不同的消费者组消费同一个Topic或分区,不同的消费分组管理自己的消费进度,消费到同一条消息,从而实现广播消费的效果。
- 指定分区消费,是指每个消费者指定分区进行消费,在本地记录消费位点,从而实现不同消费者消费同一条数据,达到广播消费的效果。
Pulsar支持的Share消费模型就是第一种实现思路。Kafka和RocketMQ主要支持第二和第三种实现思路。
灾备消费是独占消费的升级版,在保持独占消费可以支持顺序消费的基础上,同时加入灾备的消费者 。 当消费者出现问题的时候,灾备消费者加入工作,继续保持独占顺序消费。
好处是既能保持独占顺序消费,又能保证容灾能力。缺点是无法解决消费倾斜的性能问题,另外还需要准备一个消费者来做灾备,使用成本较高。
消费分组
消费分组是用来组织消费者、分区、消费进度关系的逻辑概念。为什么需要消费分组呢?
在没有消费分组直接消费Topic的场景下,如果希望不重复消费Topic中的数据,那么就 需要有一个标识来标识当前的消费情况,比如记录进度。 这个唯一标识就是消费分组。
在一个集群中可以有很多消费分组,消费分组间通过名称来区分。消费分组自身的数据是集群元数据的一部分,会存储在Broker的元数据存储服务中。消费分组主要有管理消费者和分区的对应关系、保存消费者的消费进度、实现消息可重复被消费三类功能。
因为 Topic 不存储真实数据,分区才存储消息数据,所以就需要解决消费者和分区的分配关系,即 哪个分区被哪个消费者消费,这个分配的过程就叫做消费重平衡(Rebalance)。
协调者
从实现上来看,如果要对消费者和分区进行分配,肯定需要有一个模块拥有消费分组、所有的消费者、分区信息三部分信息,这个模块我们一般命名为 协调者。 协调者主要的工作就是执行消费重平衡,并记录消费分组的消费进度。
在消费分组创建、消费者变化、分区变化的时候就会触发重新分配。分区分配的操作可以在协调者内部或者消费者上完成。
- 在协调者完成,即协调者首先获取消费者和分区的信息,然后在协调者内部完成分区分配,最后再把分配关系同步给所有消费者。
- 在消费者完成,即负责分配的消费者获取所有消费者和分区的信息,然后该消费者完成分区分配操作,最后再把分配关系同步给其他消费者。
从技术上来看,这两种形式的优劣区别并不大,取决于代码的实现。 一般在创建消费分组和消费者/ Topic分区发生变化的时候,会触发协调者执行消费重平衡。
从实现的角度来看,协调者一般是Broker内核的一个模块,就是一段代码或者一个类,专门用来完成上述的工作。当有多台Broker时,协调者的实现有多种方式,比如Kafka 集群每台Broker都有协调者存在。通过消费分组的名称计算出来一个hash值和__consumer_offset的分区数,取余计算得出一个分区号。最后这个分区号对应的Leader所在的Broker节点就是协调者所在的节点。客户端就和计算出来的这台Broker节点进行交互,来执行消费重平衡的相关操作。
消费分区分配策略
在具体实现上,一般内核会默认提供几种分配策略,也可以通过定义接口来支持用户自定义实现分区分配策略。
分区分配策略的制定一般遵循以下三个原则:
- 各个分区的数据能均匀地分配给每个消费者,保证所有消费者的负载最大概率是均衡的,该原则最为常用。
- 在每次重新分配的时候,尽量减少分区和消费者之间的关系变动,这样有助于加快重新分配的速度,并且保持数据处理的连续性,降低处理切换成本。
- 可以允许灵活地根据业务特性制定分配关系,比如根据机房就近访问最近的分区、某个Topic的奇数分区分配给第一个消费者等等。
所有消息队列的默认策略都是相对通用的,一般都会包含有轮询、粘性、自定义三种类型的策略。
轮询 就是指用轮询的方式将分区分配给各个消费者,保证每个消费者的分区数量是尽量相同的,从而保证消费者的负载最大概率上是均衡的。
粘性 是指尽量减少分区分配关系的变动,进而减少重平衡所耗费的时间和资源损耗。即当已经分配好消费者和分区的消费关系后,当消费者或者分区出现变动,就会触发重平衡。从底层来看,可能就是一个消费者掉了或者新增分区。此时需要重新进行分配的消费者和分区其实是有限的,大部分的分配关系可以不动。而此时如果使用轮询算法,则要全部打散重来,耗时就会很长,并且浪费资源,即把原先不需要重新分配的关系都重新分配一遍。
在实际的实现中,为了减少重新分配关系,有一个非常常用的算法是 一致性哈希。一致性哈希的算法经常用在负载均衡中。用一致性哈希实现粘性策略的优点是,当节点或者分区变动时,只需要执行少量的分区再分配即可。
消费确认
那么当数据被消费成功后,就必须进行消费确认操作了,告诉服务端已经成功消费了这个数据。消费确认就是我们在消息队列中常说的ACK。
一般情况下,消息确认分为确认后删除数据和确认后保存消费进度数据两种形式。
确认后删除数据 是指集群的每条消息只能被消费一次,只要数据被消费成功,就会回调服务端的ACK接口,服务端就会执行数据删除操作。在实际开发的过程中,一般都会支持单条ACK和批量ACK两种操作。这种方式不利于回溯消费,所以用得比较少。
消费成功保存消费进度 是指当消费数据成功后,调用服务端的消费进度接口来保存消费进度。这种方式一般都是配合消费分组一起用的,服务端从消费分组维度来保存进度数据。
为了保证消息的回溯消费和多次消费,消息队列大多数用的是第二种方案。 数据的删除交由数据过期策略去执行。
保存消费进度一般分为服务端保存和客户端自定义保存两种实现机制。
服务端保存 是指当消费端消费完成后,客户端需要调用一个接口提交信息,这个接口是由服务端提供的“提交消费进度”接口,然后服务端会持久保存进度。当客户端断开重新消费时,可以从服务端读取这个进度进行消费。服务端一般会通过内置的Topic或者文件来持久保存该数据。这种方式的优点就是客户端会封装好这些逻辑,使用简单,无需管理进度相关的信息,缺点就是不够灵活。服务端保存一般是默认的方案。
在提交位点信息的时候,底层一般支持自动提交和手动提交两种实现。
- 自动提交 一般是根据时间批次或数据消费到客户端后就自动提交,提交过程客户无感知。
- 手动提交 是指业务根据自己的处理情况,手动提交进度信息,以避免业务处理异常导致的数据丢失。
一般情况下,我建议你使用手动提交方式,可以避免数据丢失。
客户端自定义保存 是指当消费完成后,客户端自己管理保存消费进度。此时就不需要向服务端接口提交进度信息了,自定义保存进度信息即可,比如保存在客户端的缓存、文件、自定义的服务中,当需要修改和回滚的时候就比较方便。这种方案的优点就是灵活,缺点就是会带来额外的工作量。
消费失败处理
一个完整的消费流程包括消费数据、本地业务处理、消费进度提交三部分。
那么从消费失败的角度来看,就应该分为从服务端拉取数据失败、本地业务数据处理失败、提交位点信息失败三种情况。
从服务端拉取数据失败,和客户端的错误逻辑处理是一致的,根据可重试错误和不可重试错误的分类,进行重复消费或者向上抛错。
本地业务数据处理失败,处理起来就比较复杂了。如果是偶尔失败,那么在业务层做好重试处理逻辑,配合手动提交消费进度的操作即可解决。如果是一直失败,即使重试多次也无法被解决,比如这条数据内容有异常,导致无法被处理。此时如果一直重试,就会出现消费卡住的情况,这就需要配合死信队列的功能,将无法被处理的数据投递到死信队列中,从而保存异常数据并保证消费进度不阻塞。
提交位点信息失败,其处理方法通常是一直重试,重复提交,如果持续失败就向上抛错。因为如果提交进度失败,即使再从服务端拉取数据,还是会拉到同一批数据,出现重复消费的问题。
RabbitMQ的架构设计与实现
系统架构
RabbitMQ由Producer、Broker、Consumer三个大模块组成。生产者将数据发送到Broker,Broker 接收到数据后,将数据存储到对应的Queue里面,消费者从不同的Queue消费数据。
除了Producer、Broker、Queue、Consumer、ACK 这几个消息队列的基本概念外,它还有 Exchange、Bind、Route 这几个独有的概念。
Exchange 称为交换器,它是一个逻辑上的概念,用来做分发,本身不存储数据。流程上生产者先将消息发送到Exchange,而不是发送到数据的实际存储单元Queue里面。然后 Exchange 会根据一定的规则将数据分发到实际的Queue里面存储。这个分发过程就是Route(路由),设置路由规则的过程就是Bind(绑定)。即 Exchange 会接收客户端发送过来的 route_key,然后根据不同的路由规则,将数据发送到不同的Queue里面。
这里需要注意的是, 在RabbitMQ中是没有Topic这个用来组织分区的逻辑概念的。 RabbitMQ中的Topic是指Topic路由模式,是一种路由模式,和消息队列中的Topic意义是完全不同的。
那为什么RabbitMQ 会有Exchange、Bind、Route这些独有的概念呢?
主要和当时业界的架构设计思想以及主导设计 AMQP 协议的公司背景有关。当时的设计思路是: 希望发消息跟写信的流程一样,可以有一个集中的分发点(邮局),通过填写好地址信息,最终将信投递到目的地。这个集中分发点(邮局)就是Exchange,地址信息就是Route,填写地址信息的操作就是Bind,目的地是Queue。
协议和网络模块
在网络通信协议层面,RabbitMQ 数据流是基于四层TCP协议通信的,跑在TCP上的应用层协议是AMQP。如果开启 Management 插件,也可以支持HTTP协议的生产和消费。TCP + AMQP 是数据流的默认访问方式,也是官方推荐的使用方式,因为它性能会比 HTTP 高很多。
AMQP 是一个应用层的通信协议,可以看作一系列结构化命令的集合,用来填充TCP 层协议的body 部分。通过协议命令进行交互,可以完成各种消息队列的基本操作,如Connection.Start(建立连接)、Basic.Publish(发送消息)等等,详细的AMQP协议内容可以参考文档 AMQP Working Group 1.0 Final。
在RabbitMQ的网络层有Connectoion和Channel 两个概念需要关注。
Connection 是指TCP连接,Channel 是Connection中的虚拟连接。两者的关系是:一个客户端和一个Broker之间只会建立一条TCP连接,就是指 Connection。Channel(虚拟连接)的概念在这个连接中定义,一个 Connection中可以创建多个Channel。
客户端和服务端的实际通信都是在Channel维度通信的。 这个机制可以减少实际的TCP连接数量,从而降低网络模块的损耗。从设计角度看,也是基于IO复用、异步I/O的思路来设计的。
从编码实现的角度,RabbitMQ 的网络模块设计会比较简单。主要包含 tcp_listener、tcp_acceptor、rabbit_reader 三个进程。RabbitMQ 服务端通过 tcp_listener 监听端口,tcp_acceptor 接收请求,rabbit_reader 处理和返回请求。本质上来看是也是一个多线程的网络模型。
数据存储
元数据存储
RabbitMQ的元数据都是存在于Erlang自带的分布式数据库Mnesia中的。即每台Broker都会起一个Mnesia进程,用来保存一份完整的元数据信息。因为 Mnesia 本身是一个分布式的数据库,自带了多节点的 Mnesia 数据库之间的同步机制。所以在元数据的存储模块,RabbitMQ 的Broker 只需要调用本地的 Mnesia 接口保存、变更数据即可。不同节点的元数据同步 Mnesia 会自动完成。
Mnesia对RabbitMQ的作用,相当于ZooKeeper对于Kafka、NameServer对于RocketMQ的作用。因为Mnesia是内置在Broker中,所以部署RabbitMQ集群时,你会发现只需要部署Broker,不需要部署其他的组件。这种部署结构就很简单清晰,从而也降低了后续的运维运营成本。
在一些异常的情况下,如果不同节点上的 Mnesia 之间的数据同步出现问题,就会导致不同的 Mnesia 数据库之间数据不一致,进而导致集群出现脑裂、无法启动等情况。此时就需要手动修复异常的Mnesia实例上的数据。
因为Mnesia 本身是一个数据库,所以它和数据库一样,可以进行增删改查的操作。需要了解Mnesia 的更多操作,你可以参考 ErLang Mnesia。
消息数据存储
RabbitMQ 消息数据的最小存储单元是Queue,即消息数据是按顺序写入存储到Queue里面的。在底层的数据存储方面,所有的Queue数据是存储在同一个“文件”里面的。这个“文件”是一个虚拟的概念,表示所有的Queue数据是存储在一起的意思。
这个“文件”由队列索引(rabbit_queue_index)和消息存储(rabbitmq_msg_store)两部分组成。即在节点维度,所有 Queue 数据都是存储在rabbit_msg_store里面的,每个节点上只有一个rabbit_msg_store,数据会依次顺序写入到rabbit_msg_store中。
rabbit_msg_store是一个逻辑概念,底层的实际存储单元分为两个,msg_store_persistent和msg_store_transient,分别负责持久化消息和非持久化消息的存储。
msg_store_persistent 和 msg_store_transient 在操作系统上是以文件夹的形式表示的,具体的数据存储是以不同的文件段的形式存储在目录中,所有消息都会以追加的形式写入到文件中。当一个文件的大小超过了配置的单个文件的最大值,就会关闭这个文件,然后再创建一个文件来存储数据。
队列索引负责存储、维护队列中落盘消息的信息,包括消息的存储位置、是否交付、是否ACK等等信息。队列索引是Queue维度的,每个Queue都有一个对应的队列索引。
RabbitMQ 也提供了 过期时间(TTL)机制,用来删除集群中没用的消息。它支持单条消息和队列两个维度来设置数据过期时间。如果在队列上设置TTL,那么队列中的所有消息都有相同的过期时间。我们也可以对单条消息单独设置TTL,每条消息的TTL可以不同。如果两种方案一起使用,那么消息的TTL 就会以两个值中最小的那个为准。如果不设置TTL,则表示此消息不会过期。
删除消息时,不会立即删除数据,只是从Erlang 中的ETS表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。此时文件中的消息不会立即被删除,会被标记为已删除数据,直到一个文件中都是可以删除的数据时,再将这个文件删除,这个动作就是常说的延时删除。另外内核有检测机制,会检查前后两个文件中的数据是否可以合并,当符合合并规则时,会进行段文件的合并。
生产者和消费者
当生产者和消费者连接到Broker进行生产消费的时候,是直接和Broker交互的,不需要客户端寻址。客户端连接Broker的方式,跟我们通过HTTP服务访问Server是一样的,都是直连的。
RabbitMQ集群部署后,为了提高容灾能力,就需要在集群前面挂一层负载均衡来进行灾备。客户端拿到负载均衡IP后,在生产或消费时使用这个IP和服务端直接建立连接。因为Queue是具体存储数据的单元,不同的Queue 有可能分布在不同的Broker上,就有可能出现生产或消费基于负载均衡IP请求到的Broker,并不是当前Queue所在的Broker,从而导致生产消费失败。
为了解决这个问题,在每个Broker上会设置有转发的功能。在实现上,每台Broker节点都会保存集群所有的元数据信息。当Broker收到请求后,根据本地缓存的元数据信息判断Queue是否在本机上,如果不在本机,就会将请求转发到Queue所在的目标节点。
生产端发送数据不是直接发送到Queue,而是直接发送到Exchange。即发送时需要指定Exchange和route_key,服务端会根据这两个信息,将消息数据分发到具体的Queue。因为Exchange和route_key都是一个逻辑概念,数据是直接发送到Broker的,然后在服务端根据路由绑定规则,将数据分发到不同的Queue中,所以在客户端是没有发送生产分区分配策略的逻辑。其实从某种程度来看, Exchagne和Route的功能就是生产分区分配的过程,只是将这个逻辑从客户端移动到了服务端而已。
在消费端,RabbitMQ 支持Push(推)和Pull(拉)两种模式,如果使用了Push模式,Broker会不断地推送消息给消费者。不需要客户端主动来拉,只要服务端有消息就会将数据推给客户端。当然推送消息的个数会受到 channel.basicQos 的限制,不能无限推送,在消费端会设置一个缓冲区来缓冲这些消息。拉模式是指客户端不断地去服务端拉取消息,RabbitMQ的拉模式只支持拉取单条消息。
在AMQP协议中,是没有定义Topic和消费分组的概念的,所以在消费端没有消费分区分配、消费分组 Rebalance 等操作,消费者是直接消费Queue数据的。
为了保证消费流程的可靠性,RabbitMQ也提供了 消息确认机制。消费者在消费到数据的时候,会调用ACK接口来确认数据是否被成功消费。
底层提供了自动ACK和手动ACK两种机制。自动ACK表示当客户端消费到数据后,消费者会自动发送ACK,默认是自动ACK。手动ACK表示客户端消费到数据后,需要手动调用。ACK的时候,支持单条ACK和批量ACK两种动作,批量ACK可以用来提升ACK效率。另外,为了提升ACK动作的性能,有些客户端也支持异步的ACK。
RocketMQ的架构设计与实现
系统架构
RocketMQ由 Producer、 NameServer、 Broker、 Consumer 四大模块组成。其中,NameServer是RocketMQ的元数据存储组件。另外,在RocketMQ 5.0后,还增加了Proxy模块,用来支持gRPC协议,并为后续的计算存储分离架构做准备。
RocketMQ有Topic、MessageQueue、Group的概念,一个Topic可以包含一个或多个MessageQueue,一个Group 可以订阅一个或多个Topic。MessageQueue是具体消息数据的存储单元,订阅的时候通过Group来管理消费订阅关系。
从流程上看,Broker 在启动的时候会先连接NameServer,将各自的元数据信息上报给NameServer,NameServer会在内存中存储元数据信息。客户端在连接集群的时候,会配置对应的 NameServer 地址,通过连接NameServer来实现客户端寻址,从而连接上对应的Broker。
客户端在发送数据的时候,会指定Topic或MessageQueue。Broker收到数据后,将数据存储到对应的Topic中,消息存储在Topic的不同Queue中。在底层的文件存储中,所有Queue的数据是存储在同一个CommitLog文件中的。在订阅的时候会先创建对应的Group,消费消息后,再确认数据。
从客户端来看,在RocketMQ 5.0以后,我们也可以通过直连Proxy,将数据通过gRPC协议发送给Proxy。Proxy在当前阶段本质上只是一个代理(gRPC协议的代理),不负责真正的数据存储,当收到数据后,还是将数据转发到Broker进行保存。
协议和网络模块
在协议方面,如下图所示,RocketMQ 5.0 之前支持自定义的Remoting协议,在5.0之后,增加了gRPC协议的支持。
在传输层协议方面,Remoting和gRPC都是基于TCP协议传输的。Remoting 直接基于四层的TCP协议通信,gRPC是基于七层的HTTP2协议通信,不过 HTTP2 底层也是基于TCP,它们本质上都是应用层的协议。
数据存储
元数据存储
RocketMQ 的元数据信息实际是存储在Broker上的,Broker启动时将数据上报到NameServer模块中汇总缓存。NameServer是一个简单的TCP Server,专门用来接收、存储、分发 Broker 上报的元数据信息。这些元数据信息是存储在NameServer内存中的,NameServer不会持久化去存储这些数据。
Broker 启动或删除时,会调用NameServer的注册和退出接口,每个Broker都会存储自己节点所属的元数据信息(比如有哪些Topic、哪些Queue 在本节点上),在Broker启动时,会把全量的数据上报到 NameServer 中。
从部署形态上看,NameServer 是多节点部署的,是一个集群。 但是不同节点之间是没有相互通信的,所以本质上多个NameServer节点间数据没有一致性的概念,是各自维护自己的数据,由每台Broker上报元数据来维护每台 NameServer 节点上数据的准确性。
由于NameServer不负责具体消息数据的存储和分发,所以在请求频率、负载方面都不会很高。所以在大多数场景下,NameServer都是可以多集群共享的。从功能上看,它对RocketMQ的作用相当于RabbitMQ的Mnesia。
消息数据
RocketMQ 消息数据的最小存储单元是MessageQueue,也就是我们常说的Queue或Partition。Topic可以包含一个或多个MessageQueue,数据写入到Topic后,最终消息会分发到对应的MessageQueue中存储。
在底层的文件存储方面,并不是一个MessageQueue对应一个文件存储的,而是一个节点对应一个总的存储文件,单个Broker 节点下所有的队列共用一个日志数据文件(CommitLog)来存储,和RabbitMQ采用的是同一种存储结构。
图中主要包含 CommitLog、ConsumeQueue、IndexFile 三个跟消息存储相关的文件:
-
CommitLog 是消息主体以及元数据存储主体,每个节点只有一个,客户端写入到所有MessageQueue的数据,最终都会存储到这一个文件中。
-
ConsumeQueue 是逻辑消费队列,是消息消费的索引,不存储具体的消息数据。引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题Topic的订阅模式,消息消费是针对主题进行的,如果要遍历Commitlog文件,基于Topic检索消息是非常低效的。Consumer可根据ConsumeQueue来查找待消费的消息,ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件。
-
IndexFile 是索引文件,它在文件系统中是以HashMap结构存储的。在RocketMQ中,通过Key或时间区间来查询消息的功能就是由它实现的。
值得关注的是,因为消息数据会很多,CommitLog 会存储所有的消息内容。所以为了保证数据的读写性能,我们会对CommitLog进行分段存储。CommitLog底层默认单个文件大小为1G,消息是顺序写入到文件中,当文件满了,就会写入下一个文件。对于 ConsumeQueue 和 IndexFile,则不需要分段存储,因为它们存储的是索引数据,数据量一般很小。
在消息清理方面,RocketMQ 支持按照时间清理数据。这个时间是按照消息的生产时间计算的,和消息是否被消费无关,只要时间到了,那么数据就会被删除。
不过跟RabbitMQ不同的是,RocketMQ 不是按照主题或队列维度来清理数据的,而是按照节点的维度来清理的。原因和RocketMQ 的存储模型有关,上面说到RocketMQ所有Queue的日志都存储在一个文件中,如果要支持主题和队列单独管理,需要进行数据的合并、索引的重建,实现难度相对复杂,所以RocketMQ并没有选择主题和队列这两个维度的清理逻辑。
生产者和消费者
RocketMQ的客户端连接服务端是需要经过客户端寻址的。如下图所示,首先和NameServer 完成寻址,拿到Topic/MessageQueue和Broker的对应关系后,接下来才会和Broker进行交互。
生产端
从生产端来看,生产者是将数据发送到Topic或者Queue里面的。如果是发送到Topic,则数据要经历生产数据分区分配的过程。 即决定消息要发送到哪个目标分区。
默认情况下,RocketMQ支持轮询算法和最小投递延迟算法两种策略。默认是轮询算法,该算法保证了每个Queue中可以均匀地获取到消息。最小投递延迟算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果是直接发送到Queue,则无需经过分区选择,直接发送即可。
由于RocketMQ在协议层不支持批量发送消息的协议,所以在SDK底层是没有等待、聚合发送逻辑的。所以如果需要批量发送数据,就需要在生产的时候进行聚合,然后发送。
为了满足不同的发送场景, RocketMQ 支持单向发送、同步发送、异步发送三种发送形式。单向发送(Oneway)指发送消息后立即返回,不处理响应,不关心是否发送成功。同步发送(Sync)指发送消息后等待响应。异步发送(Async)指发送消息后立即返回,在提供的回调方法中处理响应。
消费端
在RocketMQ消费端,为了满足不同场景的消费需要,RocketMQ同时支持Pull、Push、Pop三种消费模型。
默认的消费模型是Pull,Pull的底层是以客户端会不断地去服务端拉取数据的形式实现的。Push 模型底层是以伪Push的方式实现的,即在客户端底层用一个Pull线程不断地去服务端拉取数据,拉到数据后,触发客户端设置的回调函数。让客户端从感受上看,是服务端直接将数据Push过来的。
另外,当消费者和分区都很多的时候,因为消费重平衡会消耗很长时间,且重平衡期间的消费会暂停。而在客户端也需要感知到复杂的重平衡行为,各个语言的客户端需要较高的重复开发成本。所以,RocketMQ 推出了Pop模式,将消费分区、分区分配关系、重平衡都移到了服务端, 减少了重平衡机制给客户端带来的复杂性。
RocketMQ 默认是通过消费分组机制来消费的。即在客户端消费数据的时候,会通过消费分组来管理消费关系和存储消费进度。从实现上看,同一条消息支持被多个消费分组订阅,每个消费者分组可以有多个消费者。
由于Topic和Queue模型的存在,在启动消费的时候,就需要先分配消费者和分区消费关系。这个过程就是 RocketMQ 消费端负载均衡。在实现中,消息按照哪种逻辑分配给哪个消费者,就是由消费者负载均衡策略所决定的。
根据消费者类型的不同,消费者负载均衡策略分为消息粒度负载均衡和队列粒度负载均衡两种模式。这里简单了解下。
-
消息粒度负载均衡 是指同一消费者分组内的多个消费者,将按照消息粒度平均分摊主题中的所有消息。即同一个队列中的消息,会被平均分配给多个消费者共同消费。
-
队列粒度负载均衡 是指同一消费者分组内的多个消费者,将按照队列粒度消费消息,即每个队列仅被一个消费者消费。
当消费端消费数据成功后,就需要保存消费进度信息。RocketMQ通过提交消费位点信息来保存消费进度。在服务端,RocketMQ会为每个消费分组维护一份消费位点信息,信息中会保存消费的 最大位点、最小位点、当前消费位点 等内容。
在服务端,消息被某个消费者消费完成后,不会立即在队列中被删除,以便当消费者客户端停止又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。
Kafka的架构设计与实现
系统架构
Kafka 由 Producer、Broker、ZooKeeper、Consumer 四个模块组成。其中,ZooKeeper 用来存储元数据信息,集群中所有元数据都持久化存储在ZooKeeper 当中。
Kafka有Topic和分区的概念,分区就相当于RocketMQ的MessageQueue。一个Topic可以包含一个或多个分区。消费方面通过Group来组织消费者和分区的关系。
到这你是不是发现了? Kafka架构和RocketMQ非常像。 从社区信息来看,RocketMQ最初的设计应该参考过Kafka,所以它们的架构和概念非常像,只是在具体实现方式上不太一样。比如概念上分区和MessageQueue的作用是一样的,元数据存储ZooKeeper和NameServer 的作用也是一样的。
使用 ZooKeeper 作为元数据存储服务会带来额外的维护成本、数据一致性和集群规模限制(主要是分区数)等问题。所以RocketMQ 使用NameServer替代ZooKeeper,Kafka 3.0 使用内置的Raft机制替代 ZooKeeper,就是为了解决这几个问题。
从消息的生命周期来看,生产者也需要通过客户端寻址拿到元数据信息。客户端通过生产分区分配机制,选择消息发送到哪个分区,然后根据元数据信息拿到分区Leader所在的节点,最后将数据发送到Broker。Broker收到消息并持久化存储。消费端使用消费分组或直连分区的机制去消费数据,如果使用消费分组,就会经过消费者和分区的分配流程,消费到消息后,最后向服务端提交Offset记录消费进度,用来避免重复消费。
协议和网络模块
Kafka 是自定义的私有协议,经过多年发展目前有V0、V1、V2三个版本,稳定在V2版本。官方没有支持其他的协议,比如HTTP,但是商业化的 Kafka 一般都会支持HTTP协议,原因还是 HTTP 协议使用的便捷性。
Kafka 协议从结构上来看包含协议头和协议体两部分, 协议头包含基础通用的信息,协议体由于每个接口的功能参数不一样,内容结构上差异很大。
关于协议的更多详细信息你还可以参考 官方的协议文档。
Kafka 服务端的网络层是基于 Java NIO和Reactor 来开发的,通过多级的线程调度来提高性能。
数据存储
元数据存储
Kafka的元数据是存储在ZooKeeper里面的。元数据信息包括Topic、分区、Broker节点、配置等信息。ZooKeeper 会持久化存储全量元数据信息,Broker 本身不存储任何集群相关的元数据信息。在Broker启动的时候,需要连接ZooKeeper读取全量元数据信息。
ZooKeeper是一个单独的开源项目,它自带了集群组网、数据一致性、持久化存储、监听机制等等完整的能力。它的底层是基于Zab协议组件集群,有Leader节点和Slave节点的概念,数据写入全部在Leader节点完成,Slave负责数据的读取工作。
从ZooKeeper的角度来看, Kafka只是它的一个使用者,Kafka用ZooKeeper的标准使用方式向ZooKeeper集群上写入、删除、更新数据,以完成Kafka的元数据管理、集群构建等工作。所以每台Broker启动时,都会在ZooKeeper注册、监听一些节点信息,从而感知集群的变化。
另外,Kakfa 集群中的一些如消费进度信息、事务信息,分层存储元数据,以及3.0后的Raft架构相关的元数据信息,都是基于内置Topic来完成存储的。把数据存储在内置Topic中,算是一个比较巧妙的思路了,也是一个值得借鉴的技巧。
消息数据
在消息数据存储方面,Kafka的数据是以分区为维度单独存储的。即写入数据到Topic后,根据生产分区分配关系,会将数据分发到 Topic 中不同的分区。此时底层不同分区的数据是存储在不同的“文件”中的,即一个分区一个数据存储“文件”。这里提到的“文件”也是一个虚指,在系统底层的表现是一个目录,里面的文件会分段存储。
在底层数据存储中,Kafka的存储结构是以Topic和分区维度来组织的。一个分区一个目录,目录名称是TopicName + 分区号,结构如下图所示:
每个分区的目录下,都会有 .index、.log、.timeindex 三类文件。其中,.log 是消息数据的存储文件,.index 是偏移量(offset)索引文件,.timeindex 是时间戳索引文件。两个索引文件分别根据 Offset 和时间来检索数据。
Kafka提供了根据过期时间和数据大小清理的机制,清理机制是在Topic维度生效的。 当数据超过配置的过期时间或者超过大小的限制之后,就会进行清理。清理的机制也是延时清理的机制,它是根据每个段文件进行清理的,即整个文件的数据都过期后,才会清理数据。
特别说明的是,根据大小清理的机制是在分区维度生效的,不是Topic。即当分区的数据大小超过设置的大小,就会触发清理逻辑。这个机制和RocketMQ的清理机制是一致的,但RocketMQ只提供了按节点维度配置的消息过期机制,所以相比之下,根据分区维度存储能带来一定的便捷。
在存储性能上,Kafka的写入大量依赖顺序写、写缓存、批量写来提高性能。消费方面依赖批量读、顺序读、读缓存的热数据、零拷贝来提高性能。在这些技巧中,每个分区的顺序读写是高性能的核心。
生产者和消费者
Kafka 客户端在连接 Broker 之前需要经过客户端寻址,找到目标Broker的信息。在早期,Kafka 客户端是通过链接 ZooKeeper 完成寻址操作的,但是因为ZooKeeper的性能不够,如果大量的客户端都访问ZooKeeper,那么就会导致ZooKeeper超载,从而导致集群异常。
所以在新版本的Kafka中,客户端是通过直连Broker完成寻址操作的,不会跟ZooKeeper交互。即Broker跟ZooKeeper交互,在本地缓存全量的元数据信息,然后客户端通过连接Broker拿到元数据信息,从而避免对ZooKeeper造成太大负载。
生产者
生产者完成寻址后,在发送的时候可以将数据发送到Topic或者直接发送到分区。发送到Topic时会经过生产分区分配的流程,即根据一定的策略将数据发送到不同的分区。
Kafka提供了轮询和KeyHash两种策略。
轮询策略是指按消息维度轮询,将数据平均分配到多个分区。Key Hash是指根据消息的Key生成一个Hash值,然后和分区数量进行取余操作,得到的结果可以确定要将数据发送到哪个分区。生产消息分配的过程是在客户端完成的。
Kafka 协议提供了批量(Batch)发送的语义。所以生产端会在本地先缓存数据,根据不同的分区聚合数据后,再根据一定的策略批量将数据写入到Broker。因为这个Batch机制的存在,客户端和服务端的吞吐性能会提高很多。
消费者
Kafka 的消费端只提供了Pull(拉)模式的消费。即客户端是主动不断地去服务端轮询数据、获取数据,消费则是直接从分区拉取数据的。Kafka提供了消费分组消费和直连分区消费两种模式,这两者的区别在于,是否需要进行消费者和分区的分配,以及消费进度谁来保存。
大部分情况下,都是基于消费分组消费。 消费分组创建、消费者或分区变动的时候会进行重平衡,重新分配消费关系。Kafka 默认提供了RangeAssignor(范围)、RoundRobinAssignor(轮询)、 StickyAssignor(粘性)三种策略,也可以自定义策略。消费分组模式下,一个分区只能给一个消费者消费,消费是顺序的。
当客户端成功消费数据后,会往服务端提交消费进度信息,此时服务端也不会删除具体的消息数据,只会保存消费位点信息。位点数据保存在内部的一个 Topic(__consumer_offset)中。消费端同样提供了自动提交和手动提交两种模式。当消费者重新启动时,会根据上一次保存的位点去消费数据,用来避免重复消费。
延时消息
在技术上看,消息队列让消息从不可见变为可见的核心思路都是: 先将数据写入到一个临时存储,然后根据一定的机制在数据到期后让消费端可以消费到这条消息。这个临时存储一般有以下3种选择:
- 单独设计的数据结构
- 独立的Topic
- 本地的某个存储引擎(如RocksDB、Mnesia等)
为了在延时到期后消费者可以消费到这些消息,从技术上看主要两个实现思路:
- 定时检测写入
- 消费时判断数据是否可见
从业界具体实现来看,大多都是选择定时检测写入的方式。因为消费是客户端发起的,频率不可控,每次消费都去检查是否有延时消息,可能会对集群的性能造成影响。
延时消息的实现主要有基于轮询检测机制的实现和基于时间轮机制的实现两种方案。
基于轮询检测机制的实现
该方案的核心思路是:将延时消息写入到独立的存储中,利用类似while + sleep的定时器,来推进时间,通过独立线程检测数据是否到期,然后从第三方存储中取出到期的数据进行处理。
该方案由 定时线程 和 第三方存储 两部分组成。可以使用链表、排序链表、堆、红黑树存储。也可以将原来的每个Topic一个存储结构,拆分为多个存储结构。比如可以根据时间进行拆分,如1小时、6小时、12小时、1天、大于1天等5个维度。从而降低每个存储结构的长度,在一定程度上解决性能问题。
基于时间轮机制的实现
该方案的核心思路也是:将延时消息写入到独立的存储中,然后通过构建多级时间轮,在每个时间刻度上挂载需要处理的延时消息的索引列表。再依赖时间轮的推进,获取到需要处理的延时消息列表,进行后续的处理。
时间轮是一个很成熟的算法,分为 单级时间轮 和 多级时间轮,多级时间轮是单级时间轮的扩展。它的核心思想是:
- 先设定好最小的时间精度,然后将时间划分为多个维度,比如年、月、日、时、分、秒。通过多级的时间轮来表示时间。
- 在每个刻度上挂上一个待处理的延时消息链表,链表的元素存储了延时消息的索引信息。
- 添加延时消息时,找到刻度对应的链表,在链表最后加上该元素,所以时间复杂度为O(1)。
- 获取延时消息时,找到刻度对应的链表,把这个刻度对应的链表都拿出来处理,时间复杂度也是O(1)。
如上图所示,这是包含Seconds、Minutes、Hours三个级别的时间轮,每一个时间轮的最大刻度为8,上一级时间轮最小刻度等于下一级时间轮刻度的总和。当我们设定好时间精度和时间轮的维度后,如果是添加延时消息,则在多级时间轮上找到对应时间的延时消息列表,把消息插入到列表中。如果是获取到期的延时消息,也是根据时间轮找到当前时间的延时消息列表,然后把整个列表拿出来处理即可。
主流消息队列的延时机制实现
RocketMQ 延时消息的设计思路
RocketMQ的延时消息是基于轮询检测机制的思路来实现的。
RocketMQ 在内核定义了名为 SCHEDULE_TOPIC_XXXX 的Topic来存储延迟消息。该 Topic 包含18个队列,每个队列对应一个延迟级别。比如队列0就代表延迟1s的队列,队列1就代表延迟5s的队列。
生产者把延迟消息发送到Broker之后,Broker会根据生产者定义的延迟级别放到对应的队列中。而消息原本应该去的Topic和队列,会暂时存放在消息的属性(property)中。
在 RocketMQ 中,会有专门的线程池去处理延迟消息。比如18个延迟级别,就会生成18个定时任务,每个任务对应一个队列。这个任务每隔100毫秒就会去查看对应队列中的消息,判断消息的执行时间。如果到了执行时间,那么就会把消息发送到其本该投递的Topic中,这样消费者就能消费到消息了。
RabbitMQ 延时消息的设计思路
RabbitMQ 的延迟消息有基于死信队列和集成延迟插件两种实现方案。从根本上看,RabbitMQ的这两种方案也属于是 基于轮询检测机制 的一种。
基于死信队列是指使用两个队列,一个队列接收消息不消费,然后等待指定时间过后消息过期,再由该队列绑定的死信 Exchange 机制再次将其路由到另一个队列提供业务消费。
集成延迟插件(rabbitmq-delayed-message-exchange)是指延时消息不直接投递到队列中,而是先转储到本地Mnesia数据库中,然后定时器在消息到期后再将其投递到队列中。
Kafka 延时机制的设计思路
kafka 本身不支持延时消息,但是支持延时机制,用于延时回包、延时确认的场景。
从技术上看,Kafka的延时机制是 典型的基于时间轮算法 来实现的。它的实现核心是多级时间轮以及使用Java的DelayQueue来保存延时数据和推进时间,整体实现性能和实现方案是非常优雅的。
场景
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 万级,比 RocketMQ、Kafka 低一个数量级 | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
| topic 数量对吞吐量的影响 | 千级 | 百万级 | 千级 | 百级 |
| 时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
| 可用性 | 高,基于主从架构实现高可用 | 高,基于主从 | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
| 消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
| 功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
kafka为什么性能这么好?
1、分段与索引
在 Kafka 中,每一个分区都对应多个段文件,放在同一个目录下。Kafka 根据 topic 和分区就可以确定消息存储在哪个目录内。每个段文件的文件名就是偏移量,假设为 N,那么这个文件第一条消息的偏移量就是 N+1。所以 Kafka 根据偏移量和文件名进行二分查找,就能确定消息在哪个文件里。
然后每一个段文件都有一个对应的偏移量索引文件和时间索引文件。Kafka 根据这个索引文件进行二分查找,就很容易在文件里面找到对应的消息。
2、零拷贝
和最原始的操作比起来,零拷贝少了两次内核态与用户态的切换,还少了两次 CPU 拷贝。
3、page cache
Kafka 写入的时候只是写入到了 page cache,这几乎等价于一个内存写入操作,然后依靠异步刷新把数据刷新到磁盘上。
4、顺序写
Kafka 在写入数据的时候就充分利用了顺序写的特性。它针对每一个分区,有一个日志文件 WAL(write-ahead log),这个日志文件是只追加的,也就是顺序写的,因此发消息的性能会很好。
如何保证消息不被重复消费?
为什么消息会重复?
- 生产者重复发送:生产者发送消息后,因为网络超时等原因没收到 Broker 的确认,它无法判断消息是否发送成功,为了保证消息不丢失,通常会选择重试。这就可能导致同一条消息被发送了多次。
- 消费者重复消费:消费者拉取消息,业务逻辑处理完了,正准备提交消费位点(ACK)时,服务突然宕机或重启。当服务恢复后,它会从未提交的位点重新拉取消息,导致同一条消息被再次消费。
1、数据库唯一键约束
利用数据库中“唯一索引”或“主键”的特性,来阻挡重复数据的插入。
这种方案的优点是: 实现简单,成本低,效果可靠。 缺点也很明显: 强依赖数据库特性,对于非数据库操作的场景无能为力。
基于这个思路,如果不用关系型数据库,Redis的SETNX命令(SET if Not eXists)也能达到异曲同工的效果,可以用order_id作为key,实现分布式锁或状态记录。
2、版本号机制
可以引入“前置条件”或“版本号”机制,也就是常说的乐观锁。
3、防重表
如果业务复杂,我们可以采用防重表的方案,将业务逻辑和幂等逻辑解耦。单独建立一张防重表,具体的步骤如下:
- 为每条消息生成一个全局唯一ID(GUID)。这个ID可以在生产者发送时就放入消息体或Header中。
- 建立一张“消费记录表”(consumed_log),表结构很简单,核心就是一个字段
message_id,并将其设为主键或唯一索引。 - 消费者处理逻辑变为一个“三段式”:
- 开启事务。
INSERT消息的GUID到consumed_log表中。- 执行真正的业务逻辑(更新数据库(本地事务)、调用RPC(分布式事务)等)。
- 提交事务。
这样如果是重复消息的话,就会插入消费记录表失败,就不会执行后面的业务逻辑了
4、异步校对
分布式事务引入最终一致性的设计思想,并辅以一个异步校对机制。整个流程会演变成三步:
- 预操作:收到消息后,第一步是在幂等记录表中插入一条记录,但状态标记为“处理中(PROCESSING)”。例如,插入一条记录
(order_id, 'PROCESSING')。这一步是幂等性的关键防线。 - 执行业务:调用库存、物流等下游服务,执行核心业务逻辑。
- 确认操作:所有业务逻辑成功执行后,回来将幂等记录表中的状态更新为“已完成(COMPLETED)”。
定期扫描幂等记录表中那些长时间处于“处理中”状态的记录,然后反向查询各个业务系统(比如查询物流系统是否存在该订单的物流单),来判断业务是否真的执行成功。如果查询下来业务确实已经成功,校对任务就负责将幂等记录的状态更新为“已完成”。
5、缓存判重
数据库有读瓶颈的话,最好的优化方式就是加缓存,同样这里我们可以在防重表的上层加一个redis来缓存近期处理过的 key。
当一个新的消息进来的时候,我们先通过redis做一次判重校验,如果这个key存在,那么我们就认为这是重复的key,如果redis不存在,再通过数据库做一次兜底校验,如果key存在就认为是重复的消息,如果key不存在,就认为不是重复消息,没处理过
6、布隆过滤器
为了增加性能,不让每一次判重逻辑都走数据库,我们可以在数据库前面加个布隆过滤器,每一个新的消息过来,先用布隆过滤器判重,如果不存在,我们就正常处理业务逻辑,然后再更新布隆过滤器和防重表。如果布隆过滤器存在这个key的话,由于布隆过滤器存在假阳性的问题,所以这里就可以透穿到数据库再做一次校验。就如果是重复消息,直接拒绝就可以了,如果不是重复消息,就正常处理消息
7、位图
8、状态机
如何保证消息的顺序性?
全局顺序 vs 局部顺序
- 全局顺序:要求整个Topic内的所有消息,都严格按照先进先出的顺序进行消费。这种场景相对较少,比如需要同步一个全局数据库的Binlog时。
- 局部顺序:不要求全局有序,但要求某一个特定业务范畴内的消息是有序的。这在实际业务中极为常见。例如,对于同一笔电商订单,其“已创建”、“已付款”、“已发货”、“已签收”这几条消息必须按顺序处理;但订单A和订单B之间的消息,则完全可以并行处理,互不影响。
但有时候,我们还会遇到一种更复杂的场景:不同Topic的消息也要求保证顺序。这种情况,任何单一的MQ产品都无法原生支持跨Topic的顺序性。要实现这个目标,必须引入一个外部的协调者(Coordinator)
顺序消息解决方案:
1、一个Topic一个分区
存在严重的性能瓶颈,所有消息都涌向单一分区,该节点容易成为系统的瓶颈。单一消费者也容易消费不过来。
2、单分区异步消费
在单分区性能受限的情况下,如果瓶颈主要出在消费端业务逻辑处理过慢,导致消息积压,我们能否在消费端做一些优化呢?异步消费。
消费者线程不直接执行耗时业务逻辑,而是从消息队列拉取消息后分发到不同任务队列中多线程异步消费,能够保证局部有序性。
缺点:
- 增加了系统复杂性:需要自己管理内存队列、线程池
- 存在数据丢失风险
- 没有解决生产者端和broker的单点写入压力问题
3、多分区实现局部有序性
为Topic创建多个分区,生产者发送消息时根据业务键(例如哈希)计算分区。
可能存在的问题:数据倾斜(解决:一致性hash、虚拟槽)、扩容引发的数据错乱(解决:为新分区消费者设置一个静默期,让旧消息消费完)
如何解决消息积压问题?
区分临时性还是永久性。临时性积压是指突如其来的流量,导致消费者一时半会跟不上。而永久性积压则是指消费者的消费速率本身就跟不上生产速率。
如果是临时性积压,并且评估最终消费者处理完积压消息的时间是自己能够接受的,那么就不需要解决。比如说偶发性的消息积压,需要半个小时才能处理完毕,而我完全等得起半小时,就不需要处理。
但要是接受不了,又或者是永久性积压。最简单的办法就是增加消费者, 增加到和分区数量一样。如果再增加消费者也已经没办法提高消费速率:
1、扩容分区
线上一般禁止扩容分区。分区变化也可能影响业务顺序性。
2、并行消费
- 创建一个全新的Topic,并为其设置远超当前需求的分区数(比如100个)。
- 让生产者将新的消息写入这个Topic
- 同时,部署两套消费组:一套继续消费旧Topic中的积压消息;另一套新的消费组,以足够多的消费者实例,开始消费新Topic中的消息。
- 当旧Topic中的消息全部被消费完毕后,下线旧的消费组,整个系统平滑过渡到新的Topic。
3、消息转发
- 创建新的topic
- 生产者切换到新的topic
- 部署新的消费者从旧topic中拉取旧消息作为生产者转发到新的topic中
如何解决消息不丢失?
消息丢失监测机制
如果公司有成熟的分布式链路追踪系统(比如SkyWalking、Jager),那自然是首选,每一条消息的生命周期都能被完整追踪。但如果没有,我们也可以自己动手,实现一个轻量级的检测方案。
核心思路是利用消息队列在单个分区内的有序性。我们可以在生产者(Producer)发送消息时,为每一条消息注入一个唯一且连续递增的序列号。消费者(Consumer)在接收到消息后,只需检查这个序列号是否连续,就能判断出是否有消息丢失。
怎样保证消息不丢失?
1、broker参数配置
单个broker需要保证同步刷盘,消息存储到磁盘后再返回ack;集群还需要保证同步复制,这样主从切换才能保证消息是全的。
大部分的消息生产端的消息丢失都是与acks参数设置相关。那么这里如果要保证消息100%不丢,这里我们自然要设置acks参数为all/-1。不过这也要依据业务场景来。一般只有在特别关键,并且性能要求不高的业务上才会这样去设置,而对于性能要求较高的业务是不合适的。具体可以做如下“极限”的配置:
- acks = all:确保消息写入所有 partition 副本。
- min.insync.replicas = 2(或更高):这个参数设定了 ISR 中最少需要有几个副本。比如,如果 Topic 的副本因子是 3,这里设置为 2,就意味着至少要有一个 Leader 和一个 Follower 存活,acks=all 的写入请求才能成功。这可以防止在 ISR 副本数不足时,数据写入的可靠性降级。
- unclean.leader.election.enable = false:坚决杜绝“不干净”的 Leader 选举,防止数据丢失。设置为false之后,Kafka 不会从非ISR副本中选举新的Leader。由于非ISR副本可能含有不完整或滞后的数据,从它们中选择Leader会带来数据丢失或不一致的风险。
2、生产者发送确认
3、消息消费确认








