RabbitMQ 简述

消息中间件简介

消息 (Message) 指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON等,也可以很复杂,比如内嵌对象。

消息队列中间件 (Message Queue Middleware,MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。消息队列中间件,也可以称为消息队列或者消息中间件。

消息中间件一般有两种传递模式:点对点 (P2P) 模式和发布/订阅 (Pub/Sub) 模式

  • 点对点模式是基于队列的,生产者发送消息到队列,消费者从队列中接收消息;
  • 发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题 (topic)。主题可以认为是消息传递的中介,发布者将消息发布到某个主题,而订阅者则从主题中订阅消息,主题使得订阅者与发布者互相保持独立。

消息中间件可以被应用到很多场景,总体上包括以下几类:

  • 解耦:生产者和消费者只需实现相应接口,通过消息队列可以完全隔离,甚至跨数据中心;
  • 冗余(存储): 有些情况下,处理数据的过程会失败。MQ 可以把消息持久化直到它们被完全处理,通过这一方式避免了数据丢失风险;
  • 扩展性:提高消息入队和处理的效率只要另外增加处理过程即可,不需要改变代码;
  • 削峰:能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃;
  • 可恢复性:降低了进程间的耦合度,即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理;
  • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性;
  • 缓冲:通过一个缓冲层来帮助任务高效地执行,写入消息中间件的处理会尽可能快速, 有助于控制和优化数据流经过系统的速度;
  • 异步通信:允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

RabbitMQ 介绍

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。RabbitMQ 整体模型架构如下:

RabbitMQ-Model-Arch

生产者和消费者

生产者 (Producer): 消息投递方,创建消息然后发布到 RabbitMQ 中。消息一般包含消息体 (payload) 和标签 (label) 两个部分。在实际应用中,消息体一般是带有业务逻辑结构的数据,标签则用来表述这条消息,比如交换器的名称、路由键。生产者把消息交给 RabbitMQ,RabbitMQ 根据标签把消息发送给感兴趣的消费者。

消费者 (Consumer):消息接收方, 连接到 RabbitMQ 服务器,并订阅到队列上。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费消息体,对于消费者而言并不知道消息的生产者。

服务节点 (Broker): 对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务。

消息队列运转流程如下图所示:

RabbitMQ-Operation-Process

队列

队列 (Queue):用于存储消息。RabbitMQ 中消息只能存储在队列中,生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

需要注意的是,RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发。

交换器、路由键、绑定

交换器 (Exchange):生产者将消息发送到交换器 (X),由交换器将消息路由到一个或者多个队列中。如果路由不到,将返回给生产者或直接丢弃;RabbitMQ 中的交换器有四种类型,不同的类型有不同的路由策略。

路由键 (RoutingKey):生产者将消息发给交换器时,一般会指定一个 RoutingKey,用来表示消息的路由规则,而 RoutingKey 需要与交换器类型和绑定键 ( BindingKey) 联合使用才能最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时,通过指定 RoutingKey 来决定消息流向哪里。

绑定 (Binding):RabbitMQ 中通过绑定将交换器与队列关联,在绑定时通常会指定一个绑定键 (BindingKey),这样 RabbitMQ 就知道如何正确地将消息路由到队列。

生产者将消息发送给交换器时,如果 BindingKey 和 RoutingKey 匹配,消息会被路由到对应的队列中,在绑定多个队列到同一个交换器时,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就不需要 BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

交换器类型

RabbitMQ 常用的交换器类型有 fanoutdirecttopicheaders 这四种。

  • fanout: 把所有发送到交换器的消息路由到与该交换器绑定的所有队列中;
  • direct: 把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中;
  • topic: 在 direct 类型交换器的匹配规则上进行了扩展,约定:
    • RoutingKey 为一个点号分隔的字符串,如: com.yuxiumin.rabbitmq
    • BindingKey 和 RoutingKey 一样也是点号分隔的字符串;
    • BindingKey 中可以存在两种特殊字符串 \*#,用于做模糊匹配,其中 \* 用于精确匹配一个单词,# 用于匹配零个或多个单词。
  • headers:不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。由于 headers 类型的交换器性能差且不实用,基本上不会看到它的存在。

RabbitMQ 运转流程

生产者连接 RabbitMQ 发送消息流程如下:

  1. 生产者连接到 Broker,建立一个连接,开启一个信道 (Channel);
  2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;
  3. 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
  4. 生产者通过路由键将交换器和队列绑定起来;
  5. 生产者发送消息至 Broker,其中包含路由键、交换器等信息;
  6. 相应的交换器根据接收到的路由键查找相匹配的队列;
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中;
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者;
  9. 关闭信道;
  10. 关闭连接。

消费者接收消息的过程如下:

  1. 消费者连接到 Broker,建立一个连接,开启一个信道 (Channel);
  2. 消费者向 Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作;
  3. 等待 Broker 回应并投递相应队列中的消息,消费者接收消息;
  4. 消费者确认 (Ack) 接收到的消息;
  5. RabbitMQ 从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接;

RabbitMQ 高级特性

mandatory 参数

mandatory 是 channel.basicPublish 方法中的参数,具有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ 提供的备份交换器可以将未能被交换器路由的消息 (没有绑定队列或者没有匹配的绑定) 存储起来,而不用返回给客户端。

mandatory 参数设为 true 时,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列,RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,如果出现上述情形,则消息直接被丢弃。生产者可以通过调用 channel.addReturnListener 来添加 ReturnListener 监听器获取到没有被正确路由到合适队列的消息。

备份交换器

如上文所述,生产者在发送消息的时候, 如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失;如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。

如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。可以 通过在声明交换器 (调用 channel.exchangeDeclare 方法) 的时候添加 alternate-exchange 参数来实现,也可以通过策略 (Policy) 的方式实现, 如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置。

对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。

过期时间 (TTL)

RabbitMQ 可以对消息和队列设置过期时间 (Time to Live, TTL)

设置消息的 TTL

可以通过两种方法可以设置消息的 TTL:

  1. 通过队列属性设置: 队列中所有消息都有相同的过期时间;
  2. 对消息本身单独设置: 每条消息的 TTL 可以不同。

如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的时间一旦超过设置的 TTL 值时,就会变成 “死信”。

如果不设置 TTL,表示消息不会过期;如果将 TTL 设置为 0,表示除非可以直接将消息投递到消费者,否则该消息会被立即丢弃。

通过队列属性设置消息 TTL 的方法是在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现,这个参数的单位是毫秒;针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 参数,单位也为毫秒。

设置队列的 TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。此处未使用指的是队列上没有任何的消费者,也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

设置队列里的 TTL 可以应用于类似 RPC 方式的回复队列,在 RPC 中,许多队列会被创建出来,但是却是未被使用的。RabbitMQ 会确保在过期时间到达后将队列删除,但不保障删除动作的及时性,在 RabbitMQ 重启后,持久化队列的过期时间会被重新计算。

用于表示过期时间的 x-expires 参数以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,不过不能设置为 0。比如该参数设置为 1000,则表示该队列如果在 1 秒钟之内未使用则会被删除。

死信队列

当消息在一个队列中变成死信之后,它能重新被发送到另一个交换器中,这个交换器就是死信交换器 (DLX),绑定 DLX 的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:

  • 消息被拒绝 (Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  • 消息过期;
  • 队列达到最大长度。

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

可以通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX。

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费 (消费者调用了 Basic.Nack 或者 Basic.Reject) 而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。DLX 配合 TTL 使用还可以实现延迟队列的功能。

延迟队列

延迟队列存储的对象是对应的延迟消息。延迟消息指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列的使用场景有很多,比如:

  • 在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过上文所述的 DLX 和 TTL 模拟出延迟队列的功能。

持久化

持久化可以提高 RabbitMQ 的可靠性,防止在异常情况 (重启、关闭、宕机等) 下的数据丢失。RabbitMQ 的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

交换器的持久化是通过在声明交换器时将 durable 参数置为 true 实现的。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,但是消息不会丢失,只是不能将消息发送到这个交换器中。对一个长期使用的交换器来说,建议将其置为持久化的。

队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的,如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会 丢失。队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。如果要确保消息不会丢失,需要将消息设置为持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性) 设置为 2 即可实现消息的持久化。设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在;只设置队列持久化,重启之后消息会丢失;只设置消息的持久化,重启之后队列消失,继而消息也丢失。

需要注意的是,将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能, 对于可靠性要求不高的消息可以不采用持久化处理以提高整体的吞吐量,在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。

此外,即使将交换器、队列、消息都设置持久化,也不能百分之百保证数据不丢失。

  • 首先从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 true,如果当消费者接收到消息后还没来得及处理就宕机了,消息会发生丢失。对于这种情况需要将 autoAck 参数设置为 false,并进行手动确认。
  • 其次 RabbitMQ 并不会为每条消息都同步存盘,而是先保存到操作系统缓存之中,所以在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间才能写入磁盘。如果在这段时间内 RabbitMQ 服务节点发生了宕机、重启等异常情况,消息保存还没来得及写入磁盘,那么消息也将会丢失。对于此问题可以引入 RabbitMQ 的镜像队列机制,如果主节点宕机,可以自动切换到从节点,这样有效地保证了高可用性,除非整个集群都挂掉。
  • 另外,还可以在发布端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至 RabbitMQ 中,前提还要保证在调用 channel.basicPublish 方法的时候交换器能够将消息正确路由到相应的队列之中。

消息确认机制

生产者确认

在使用 RabbitMQ 时,可以通过消息持久化来解决因为服务器异常宕机而导致的消息丢失。但是当生产者将消息发送出去之后到底有没有正确地到达服务器,如果不进行特殊配置,默认情况下发送消息的操作不会返回任何信息给生产者,也就是默认情况下生产者不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题。RabbitMQ 提供了两种解决方式:(1) 事务机制;(2) 发送方确认 (publisher confirm) 机制。

事务机制

RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelectchannel.txCommitchannel.txRollback

channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。

需要注意的是,使用事务机制会对 RabbitMQ 的性能造成很大影响,为此 RabbitMQ 提供了一个改进方案,即发送方确认机制,但是发送方确认机制并非 AMQP 协议官方支持。

发送方确认机制

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认 (Basic.Ack) 给生产者 (包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。

消费者确认

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制。

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是原来的那个消费者。RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,可以通过 Basic.Reject 命令进行,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。Channel 类中的 basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中 deliveryTag 可以看作消息的编号; 如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,可以使用 Basic.Nack 命令。消费者客户端可以调用 channel.basicNack 方法来实现,定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throwsIOException;

其中 deliveryTagrequeue 的含义可以参考 basicReject 方法。multiple 参数设置为 false 表示拒绝编号为 deliveryTag 的消息,此时 basicNackbasicReject 方法一样;multiple 参数设置为 true 表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。

需要注意,将 channel.basicReject 或者 channel.basicNack 中的 requeue 设置为 false,可以启用死信队列功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

对于 requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。 其对应的客户端方法为:

1. Basic.RecoverOk basicRecover() throws IOException;
2. Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息,如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,requeue 参数默认为 true

消费者特性

消息分发

当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的方式发送给消费者,每条消息只会发送给订阅列表里的一个消费者,如果应用负载加重,那么只需要创建更多的消费者来消费处理消息即可。默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给第 m % n (取余的方式) 个消费者,而不管消费者是否消费并已经 确认 (Basic.Ack) 了消息。

这时,如果部分消费者负载较高来不及消费消息,而部分其他消费者由于某些原因 (比如业务逻辑简单、机器性能卓越等) 很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。

为了解决上述问题,需要用到 channel.basicQos(int prefetchCount),此方法允许限制信道上的消费者所能保持的最大未确认消息的数量。例如:在订阅消费队列之前,消费端程序调用了 channel.basicQos(5),之后订阅了某个队列进行消费。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息,直到消费者确认了某条消息之后,RabbitMQ 将相应的计数减 1,之后消费者可以继续接收消息,直到再次到达计数上限。

需要特别注意,Basic.Qos 的使用对于拉模式的消费方式无效。

channel.basicQos 有三种类型的重载方法:

1. void basicQos(int prefetchCount) throws IOException;
2. void basicQos(int prefetchCount, boolean global) throws IOException;
3. void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

上文只用到了 prefetchCount 这个参数,当 prefetchCount 设置为 0 表示没有上限。prefetchSize 参数表示消费者所能接收未确认消息的总体大小的上限,单位为 B,设置为 0 表示没有上限。

消息顺序性

消息顺序性指消费者的消息和发送者发布的消息的顺序是一致的。例如,不考虑消息重复的情况,如果生产者发布的消息分别为:msg1、msg2、msg3,那么消费者必然也是按照 msg1、msg2、msg3 的顺序进行消费的。

RabbitMQ 在不使用任何高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。以下为几种常见的情形:

  • 如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头 就出现了错序。同样,如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ 的 Basic.Nack 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。可以简单地认为消息的顺序性保障是从存入队列之开始的,而不是在发送的时候开始的。
  • 如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
  • 如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的;
  • 如果一个队列按照前后顺序有 msg1、 msg2、 msg3、 msg4 这 4 个消息,同时有 ConsumerA 和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为 msg1 和 msg3,ConsumerB 中的消息为 msg2、msg4。 ConsumerA 收到消息 msg1 之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msg1 之后被发送到了 ConsumerB 中,此时 ConsumerB 已经消费了 msg2、 msg4,之后再消费 msg1,这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 已经消费了 msg3,那么再消费 msg1,消息顺序性也无法得到保障。同样可以用在 Basic.Recover 这个 AMQP 命令中。

包括但不仅限于以上几种情形会使 RabbitMQ 消息错序,如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识来实现。

消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,消息中间件的消息传输保障通常可分为三个层级:

  • 最多一次 (At most once): 消息可能会丢失,但绝不会重复传输。
  • 最少一次 (At least once): 消息绝不会丢失,但可能会重复传输。
  • 恰好一次 (Exactly once): 每条消息肯定会被传输一次且仅传输一次。

RabbitMQ 支持其中的 “最多一次” 和 “最少一次”。其中 “最少一次” 投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要开启事务机制或者发布者确认机制,以确保消息可以可靠地传输到 RabbitMQ 中;
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

“最多一次” 的方式无须考虑以上方面,不过很难确保消息不会丢失。

“恰好一次” 是 RabbitMQ 目前无法保障的。例如:

  • 消费者在消费完一条消息之后向 RabbitMQ 发送确认 Basic.Ack 命令,此时由于网络断开或者其他原因造成 RabbitMQ 并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。
  • 生产者在使用发布者确认机制的时候,发送完一条消息等待 RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样的消息,在消费的时候,消费者就会重复消费。

RabbitMQ 并没有去重的机制来保证 “恰好一次”,去重处理一般是在业务客户端实现。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理。

小结

以上是对 RabbitMQ 的原理和使用过程需要注意事项的一些总结,并没有涉及 Cluster、Federation 等高可用方面的特性介绍。总的来说 RabbitMQ 通过对于 AMQP协议的实现,在可靠数据传输方便做了一定程度保证,不过中间件毕竟不是万能的,在具体业务场景下必须根据实际需要进行相匹配的使用。

参考资料

RabbitMQ Document

AMQP 0-9-1 Model Explained

RabbitMQ 实战指南