RabbitMQ 原理

2023/3/16

整理自:《RabbitMQ 实战指南》 (opens new window)



# 相关概念

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说 RabbitMQ 模型更像是一种交换机模型

# 整体模型架构

消息流转模型:

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理;RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做

# 组成部分

  • 生产者和消费者
  • 队列
  • 交换器、路由键、绑定键:生产者将消息发给交换器的时候,一般会指定一个路由键(就像分库分表中的partition key),而这个路由键需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效
    • 交换器类型:direct、fanout、topic、headers

# 运转流程

生产者发送消息流程:

  1. 生产者连接到 RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel)
  2. 生产者声明一个交换器,并设置相关属性,比如交换机类型
  3. 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过路由键将交换器和队列绑定起来
  5. 生产者发送消息至 RabbitMQBroker,其中包含路由键、交换器等信息
  6. 相应的交换器根据接收到的路由键查找相匹配的队列
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道、关闭连接

消费者接收消息流程:

  1. 消费者连接到 RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel)
  2. 消费者向 RabbitMOBroker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
  3. 等待RabbitMQBroker回应并投递相应队列中的消息,消费者接收消息
  4. 消费者确认(ack)接收到的消息
  5. RabbitMO从队列中删除相应已经被确认的消息
  6. 关闭信道、关闭连接

# 具体特性

# 消费模式

RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式

# 消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,,之后再删除)。当 autoAck 等于 true时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用Basic.Ack 命令为止

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经新开连接,则 RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久

# 消息何去何从

# mandatory参数

  1. 当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用 Basic.Return 命令将消息返回给生产者
  2. 当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃

    那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监听器实现

# 备份交换器

备份交换器,或者更直白地称之为“备胎交换器” 生产者在发送消息的时候如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失;如果设置了;mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息

# 过期时间

消息和队列都可以设置过期时间

# 死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLXI 的队列就称之为死信队列

消息变成死信一般是由于以下几种情况:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  2. 消息过期
  3. 队列达到最大长度

DLX 也是一个正常的交换器,和一般的交换器没有区别;当队列中存在死信时,RabbitMO就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理

# 延迟队列

延退队列存储的对象是对应的延退消息,所请“延退消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延退队列的使用场景有很多,比如:

  • 在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延退队列来处理这些订单了
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延退队列,当指令设定的时间到了再将指令推送到智能设备

在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延退队列的功能,但是可以通过前面所介绍的 DLX 和 TTL模拟出延迟队列的功能

在上图中,不仅展示的是死信队列的用法,也是延退队列的用法,对于。queue.dlx这个死信队列来说,同样可以看作延退队列。假设一个应用中需要将每条消息都设置为10秒的延退,生产者通过exchange.normal这个交换器将发送的消息存储在queue.normal这个队列中。消费者订阅的并非是queue.normal这个队列,而是gqueue.dlx这个队列。当消息从queue.normal这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息

在真实应用中,对于延退队列可以根据延退时间的长短分为多个等级,一般分为5秒、10秒、30秒、1分钟、5分钟、10分钟、30分钟、1小时这几个维度,当然也可以再细化一下

# 持久化

持久化可以提高 RabbitMQ 的可靠性,防在异常情况(重启、关闭、右机等)下的数据丢失 RabbitMQ 的持久化分为三个部分:

  1. 交换器的持久化:交换器元数据
  2. 队列的持久化:队列元数据
  3. 消息的持久化

    如果交换器不设置持久化,那么在RabbitMO服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失 如果队列不设置持久化,那么在RabbitMO服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所请“皮之不存,毛将意附”

将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。 首先从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 tue,那么当消费者接收到相关消息之后,还没来得及处理就岩机了,这样也算数据丢失。这种情况很好解决,将 autoAck 参数设置为 false

其次,在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的 fsync 方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失

这个问题怎么解决呢?这里可以引入 RabbitMQ 的镜像队列机制,相当于配置了副本,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证 RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列

还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至 RabbitMQ 中

# 生产者确认

在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ 针对这个问题,提供了两种解决方式

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

# 事务机制

RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit 、channel.txRollback

channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRo11back 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRo11back 方法来实现事务回滚。注意这里的RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?RabbitMO提供了一个改进方案,即发送方确认机制

# 发送方确认机制

生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入之后发出(其实不一定到磁盘,RabbitMQ 并不会为每条消息都进行同步存盘)。RabbitMQ 回传给生产者的确认消息中的deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理

try {
        channel.confirmSelect();// 将信道置为confirm模式
        // 之后正常发送消息
        channel.basicPublish("exchange", "routingKey", null, "publisher confirm test".getBytes());
        if (!channel.waitForConfirms()) {
        System.out.println("send message failed");
        // do something else....
        }
        } catch (InterruptedException e) {
        ...
        }

# 两种机制的优劣势

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息

# 消息分发

当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可

很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给第 m%n(取余的方式)个消费者,RabbitMQ 不管消费者是否消费并已经确认(Basic.Ack)了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降

那么该如何处理这种情况呢?这里就要用到 channel.basicQos(intprefetchCount)这个方法,如前面章节所述,channel.basicoos方法允许限制信道上的消费者所能保持的最大未确认消息的数量

举例说明,在订阅消费队列之前,消费端程序调用了channel.basicQos(5),之后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后,RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的“滑动窗口”

# 消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为 msgl、msg2、msg3,那么消费者必然也是按照 msgl、msg2、msg3 自的顺序进行消费的。

日前很多资料显示 RabbitMQ 的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何 RabbitMQ 的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达Broker 的前后顺序,也就无法验证消息的顺序性。

那么哪些情况下 RabbitMQ 的消息顺序性会被打破呢?下面介绍几种常见的情形:

  1. 如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。同样,如果启用publisherconfirm时,在发生超时、中断,又或者是收到RabbitMQ的Basic.Nack命令时,那么同样需要补偿发送,结果与事务机制一样会错序
  2. 考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延退队列,那么消费者在消费这个延退队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致
  3. 再考虑一种情形,如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的
  4. 如果一个队列按照前后顺序分有 msgl、msg2、msg3、msg4 这 4 个消息,同时有 ConsumerA 和ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为 msgl 和 msg3,ConsumerB 中的消息为 msg2、msg4。ConsumerA 收到消息 msgl 之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msgl 之后被发送到了 ConsumerB 中,此时 ConsumerB 已经消费了 msg2、msg4,之后再消费 msgl,这样消息顺序性也就错乱了

# 消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:

  1. At most once:最多一次。消息可能会丢失,但绝不会重复传输
  2. At least once:最少一次。消息绝不会丢失,但可能会重复传输
  3. Exactly once:怡好一次。每条消息肯定会被传输一次且仅传输一次

RabbitMQ 支持其中的 “最多一次” 和 “最少一次”。其中 “最少一次”(不考虑异步同步刷盘) 投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要开启事务机制或者 confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失

“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会去失

# RabbitMQ集群

# 普通集群

RabbitMQ 集群中的所有节点都会备份所有的元数据信息,包括以下内容:

  1. 队列元数据:队列的名称及属性
  2. 交换器:交换器的名称及属性
  3. 绑定关系元数据:交换器与队列或者交换器与交换器之间的绑定关系

但是不会备份消息。基于存储空间和性能的考虑,在 RabbitMQ 集群中创建队列,集群只会在单个节点而不是在所有节点上创建队列的进程并包含完整的队列信息(元数据、状态、内容)。这样只有队列的宿主节点,即所有者节点知道队列的所有信息,所有其他非所有者节点只知道队列的元数据和指向该队列存在的那个节点的指针。因此当集群节点崩溃时,该节点的队列进程和关联的绑定都会消失

客户端连接到非队列数据所在节点 如果消息生产者向往集群中节点1的队列中发送数据,但是连接到节点 2 或节点 3,队列1的完整数据不在这两个节点上,那么这两个节点在发送消息的过程中主要起到路由转发的作用,根据这两个节点上的元数据被转发到节点1,最终发送的消息仍然会存储在节点1的队列1中。同样,如果消息消费者连接的节点2或节点3,这两个节点也会充当路由节点转发消息,消息会从节点1的队列1中拉取消费

# 镜像集群

引入镜像队列(MirrorQueue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列(以下简称镜像队列)都包含一个主节点(master)和若干个从节点(slave)

slave 会准确地按照 master 执行命令的顺序进行动作,故 slave 与 master 上维护的状态应该是相同的。如果master 由于某种原因失效,那么“资历最老”的 slave 会被提升为新的 master。根据 slave 加入的时间排序,时间最长的 slave 即为“资历最老”。发送到镜像队列的所有消息会被同时发往 master 和所有的 slave上,如果此时master 挂掉了,消息还会在 slave 上,这样 slave 提升为 master 的时候消息也不会丢失

注意要点:

RabbitMQ 的镜像队列支持 publisher confirm 和事务两种机制。在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx.Commit-ok 的消息。同样的,在 publisher confirm 机制中,生产者进行当前消息确认的前提是该消息被全部进行所接收了