分类目录归档:RabbitMQ

RabbitMMQ和Kafka比较

前言

不同的场景需要不同的解决方案,选错一个方案能够严重的影响你对软件的设计,开发和维护的能力。

这篇文章会先介绍RabbitMQ和Apache Kafka内部实现的相关概念。紧接着会主要介绍这两种技术的主要不同点以及他们各自的优缺点,最后我们会说明一下怎样选择这两种技术。

一、异步消息模式

异步消息可以作为解耦消息的生产和处理的一种解决方案。提到消息系统,我们通常会想到两种主要的消息模式——消息队列和发布/订阅模式。

1、消息队列

利用消息队列可以解耦生产者和消费者。多个生产者可以向同一个消息队列发送消息;但是,一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。也就是说一个具体的消息只能由一个消费者消费。
消息队列

需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。消息队列除了提供解耦功能之外,它还能够对生产者和消费者进行独立的伸缩(scale),以及提供对错误处理的容错能力。

2、发布/订阅

发布/订阅(pub/sub)模式中,单个消息可以被多个订阅者并发的获取和处理。
发布/订阅

例如,一个系统中产生的事件可以通过这种模式让发布者通知所有订阅者。在许多队列系统中常常用主题(topics)这个术语指代发布/订阅模式。在RabbitMQ中,主题就是发布/订阅模式的一种具体实现(更准确点说是交换器(exchange)的一种),但是在这篇文章中,我会把主题和发布/订阅当做等价来看待。

一般来说,订阅有两种类型:

  • 1)临时(ephemeral)订阅,这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。
  • 2)持久(durable)订阅,这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。

二、RabbitMQ

RabbitMQ作为消息中间件的一种实现,常常被当作一种服务总线来使用。RabbitMQ原生就支持上面提到的两种消息模式。其他一些流行的消息中间件的实现有ActiveMQ,ZeroMQ,Azure Service Bus以及Amazon Simple Queue Service(SQS)。这些消息中间件的实现有许多共通的地方,这边文章中提到的许多概念大部分都适用于这些中间件。

1、队列

RabbitMQ支持典型的开箱即用的消息队列。开发者可以定义一个命名队列,然后发布者可以向这个命名队列中发送消息。最后消费者可以通过这个命名队列获取待处理的消息。

2、消息交换器

RabbitMQ使用消息交换器来实现发布/订阅模式。发布者可以把消息发布到消息交换器上而不用知道这些消息都有哪些订阅者。

每一个订阅了交换器的消费者都会创建一个队列;然后消息交换器会把生产的消息放入队列以供消费者消费。消息交换器也可以基于各种路由规则为一些订阅者过滤消息。

RabbitMQ消息交换器

需要重点注意的是RabbitMQ支持临时和持久两种订阅类型。消费者可以调用RabbitMQ的API来选择他们想要的订阅类型。

根据RabbitMQ的架构设计,我们也可以创建一种混合方法——订阅者以组队的方式然后在组内以竞争关系作为消费者去处理某个具体队列上的消息,这种由订阅者构成的组我们称为消费者组。按照这种方式,我们实现了发布/订阅模式,同时也能够很好的伸缩(scale-up)订阅者去处理收到的消息。

发布/订阅与队列的联合使用

三、Apache Kafka

Apache Kafka不是消息中间件的一种实现。相反,它只是一种分布式流式系统。

不同于基于队列和交换器的RabbitMQ,Kafka的存储层是使用分区事务日志来实现的。Kafka也提供流式API用于实时的流处理以及连接器API用来更容易的和各种数据源集成;当然,这些已经超出了本篇文章的讨论范围。

云厂商为Kafka存储层提供了可选的方案,比如Azure Event Hubsy以及AWS Kinesis Data Streams等。对于Kafka流式处理能力,还有一些特定的云方案和开源方案,不过,话说回来,它们也超出了本篇的范围。

1、主题

Kafka没有实现队列这种东西。相应的,Kafka按照类别存储记录集,并且把这种类别称为主题。

Kafka为每个主题维护一个消息分区日志。每个分区都是由有序的不可变的记录序列组成,并且消息都是连续的被追加在尾部。

当消息到达时,Kafka就会把他们追加到分区尾部。默认情况下,Kafka使用轮询分区器(partitioner)把消息一致的分配到多个分区上。

Kafka可以改变创建消息逻辑流的行为。例如,在一个多租户的应用中,我们可以根据每个消息中的租户ID创建消息流。IoT场景中,我们可以在常数级别下根据生产者的身份信息(identity)将其映射到一个具体的分区上。确保来自相同逻辑流上的消息映射到相同分区上,这就保证了消息能够按照顺序提供给消费者。

Kafka生产者

消费者通过维护分区的偏移(或者说索引)来顺序的读出消息,然后消费消息。

单个消费者可以消费多个不同的主题,并且消费者的数量可以伸缩到可获取的最大分区数量。

所以在创建主题的时候,我们要认真的考虑一下在创建的主题上预期的消息吞吐量。消费同一个主题的多个消费者构成的组称为消费者组。通过Kafka提供的API可以处理同一消费者组中多个消费者之间的分区平衡以及消费者当前分区偏移的存储。

Kafka消费者

2、Kafka实现的消息模式

Kafka的实现很好地契合发布/订阅模式。

生产者可以向一个具体的主题发送消息,然后多个消费者组可以消费相同的消息。每一个消费者组都可以独立的伸缩去处理相应的负载。由于消费者维护自己的分区偏移,所以他们可以选择持久订阅或者临时订阅,持久订阅在重启之后不会丢失偏移而临时订阅在重启之后会丢失偏移并且每次重启之后都会从分区中最新的记录开始读取。

但是这种实现方案不能完全等价的当做典型的消息队列模式看待。当然,我们可以创建一个主题,这个主题和拥有一个消费者的消费组进行关联,这样我们就模拟出了一个典型的消息队列。不过这会有许多缺点,我们会在第二部分详细讨论。

值得特别注意的是,Kafka是按照预先配置好的时间保留分区中的消息,而不是根据消费者是否消费了这些消息。这种保留机制可以让消费者自由的重读之前的消息。另外,开发者也可以利用Kafka的存储层来实现诸如事件溯源和日志审计功能。

尽管有时候RabbitMQ和Kafka可以当做等价来看,但是他们的实现是非常不同的。所以我们不能把他们当做同种类的工具来看待;一个是消息中间件,另一个是分布式流式系统。

作为解决方案架构师,我们要能够认识到它们之间的差异并且尽可能的考虑在给定场景中使用哪种类型的解决方案。下面会指出这些差异并且提供什么时候使用哪种方案的指导建议。

四、RabbitMQ和Kafka的显著差异

RabbitMQ是一个消息代理,但是Apache Kafka是一个分布式流式系统。好像从语义上就可以看出差异,但是它们内部的一些特性会影响到我们是否能够很好的设计各种用例。
例如,Kafka最适用于数据的流式处理,但是RabbitMQ对流式中的消息就很难保持它们的顺序。
另一方面,RabbitMQ内置重试逻辑和死信(dead-letter)交换器,但是Kafka只是把这些实现逻辑交给用户来处理。
这部分主要强调在不同系统之间它们的主要差异。

1、消息顺序

对于发送到队列或者交换器上的消息,RabbitMQ不保证它们的顺序。尽管消费者按照顺序处理生产者发来的消息看上去很符合逻辑,但是这有很大误导性。

RabbitMQ文档中有关于消息顺序保证的说明:

“发布到一个通道(channel)上的消息,用一个交换器和一个队列以及一个出口通道来传递,那么最终会按照它们发送的顺序接收到。”

——RabbitMQ代理语义(Broker Semantics)

换话句话说,只要我们是单个消费者,那么接收到的消息就是有序的。然而,一旦有多个消费者从同一个队列中读取消息,那么消息的处理顺序就没法保证了。

由于消费者读取消息之后可能会把消息放回(或者重传)到队列中(例如,处理失败的情况),这样就会导致消息的顺序无法保证。

一旦一个消息被重新放回队列,另一个消费者可以继续处理它,即使这个消费者已经处理到了放回消息之后的消息。因此,消费者组处理消息是无序的,如下表所示:

使用RabbitMQ丢失消息顺序的例子

当然,我们可以通过限制消费者的并发数等于1来保证RabbitMQ中的消息有序性。更准确点说,限制单个消费者中的线程数为1,因为任何的并行消息处理都会导致无序问题。

不过,随着系统规模增长,单线程消费者模式会严重影响消息处理能力。所以,我们不要轻易的选择这种方案。

另一方面,对于Kafka来说,它在消息处理方面提供了可靠的顺序保证。Kafka能够保证发送到相同主题分区的所有消息都能够按照顺序处理。

在前面说过,默认情况下,Kafka会使用循环分区器(round-robin partitioner)把消息放到相应的分区上。不过,生产者可以给每个消息设置分区键(key)来创建数据逻辑流(比如来自同一个设备的消息,或者属于同一租户的消息)。

所有来自相同流的消息都会被放到相同的分区中,这样消费者组就可以按照顺序处理它们。

但是,我们也应该注意到,在同一个消费者组中,每个分区都是由一个消费者的一个线程来处理。结果就是我们没法伸缩(scale)单个分区的处理能力。

不过,在Kafka中,我们可以伸缩一个主题中的分区数量,这样可以让每个分区分担更少的消息,然后增加更多的消费者来处理额外的分区。

获胜者(Winner):

显而易见,Kafka是获胜者,因为它可以保证按顺序处理消息。RabbitMQ在这块就相对比较弱。

2、消息路由

RabbitMQ可以基于定义的订阅者路由规则路由消息给一个消息交换器上的订阅者。一个主题交换器可以通过一个叫做routing_key的特定头来路由消息。

或者,一个头部(headers)交换器可以基于任意的消息头来路由消息。这两种交换器都能够有效地让消费者设置他们感兴趣的消息类型,因此可以给解决方案架构师提供很好的灵活性。

另一方面,Kafka在处理消息之前是不允许消费者过滤一个主题中的消息。一个订阅的消费者在没有异常情况下会接受一个分区中的所有消息。

作为一个开发者,你可能使用Kafka流式作业(job),它会从主题中读取消息,然后过滤,最后再把过滤的消息推送到另一个消费者可以订阅的主题。但是,这需要更多的工作量和维护,并且还涉及到更多的移动操作。

获胜者:

在消息路由和过滤方面,RabbitMQ提供了更好的支持。

3、消息时序(timing)

在测定发送到一个队列的消息时间方面,RabbitMQ提供了多种能力:

1)消息存活时间(TTL)

发送到RabbitMQ的每条消息都可以关联一个TTL属性。发布者可以直接设置TTL或者根据队列的策略来设置。

系统可以根据设置的TTL来限制消息的有效期。如果消费者在预期时间内没有处理该消息,那么这条消息会自动的从队列上被移除(并且会被移到死信交换器上,同时在这之后的消息都会这样处理)。

TTL对于那些有时效性的命令特别有用,因为一段时间内没有处理的话,这些命令就没有什么意义了。

2)延迟/预定的消息

RabbitMQ可以通过插件的方式来支持延迟或者预定的消息。当这个插件在消息交换器上启用的时候,生产者可以发送消息到RabbitMQ上,然后这个生产者可以延迟RabbitMQ路由这个消息到消费者队列的时间。

这个功能允许开发者调度将来(future)的命令,也就是在那之前不应该被处理的命令。例如,当生产者遇到限流规则时,我们可能会把这些特定的命令延迟到之后的一个时间执行。

Kafka没有提供这些功能。它在消息到达的时候就把它们写入分区中,这样消费者就可以立即获取到消息去处理。

Kafka也没用为消息提供TTL的机制,不过我们可以在应用层实现。

不过,我们必须要记住的一点是Kafka分区是一种追加模式的事务日志。所以,它是不能处理消息时间(或者分区中的位置)。

获胜者:

毫无疑问,RabbitMQ是获胜者,因为这种实现天然的就限制Kafka。

4、消息留存(retention)

当消费者成功消费消息之后,RabbitMQ就会把对应的消息从存储中删除。这种行为没法修改。它几乎是所有消息代理设计的必备部分。

相反,Kafka会给每个主题配置超时时间,只要没有达到超时时间的消息都会保留下来。在消息留存方面,Kafka仅仅把它当做消息日志来看待,并不关心消费者的消费状态。

消费者可以不限次数的消费每条消息,并且他们可以操作分区偏移来“及时”往返的处理这些消息。Kafka会周期的检查分区中消息的留存时间,一旦消息超过设定保留的时长,就会被删除。

Kafka的性能不依赖于存储大小。所以,理论上,它存储消息几乎不会影响性能(只要你的节点有足够多的空间保存这些分区)。

获胜者:

Kafka设计之初就是保存消息的,但是RabbitMQ并不是。所以这块没有可比性,Kafka是获胜者。

5、容错处理

当处理消息,队列和事件时,开发者常常认为消息处理总是成功的。毕竟,生产者把每条消息放入队列或者主题后,即使消费者处理消息失败了,它仅仅需要做的就是重新尝试,直到成功为止。

尽管表面上看这种方法是没错的,但是我们应该对这种处理方式多思考一下。首先我们应该承认,在某些场景下,消息处理会失败。所以,即使在解决方案部分需要人为干预的情况下,我们也要妥善地处理这些情况。

消息处理存在两种可能的故障:

1)瞬时故障——故障产生是由于临时问题导致,比如网络连接,CPU负载,或者服务崩溃。我们可以通过一遍又一遍的尝试来减轻这种故障。

2)持久故障——故障产生是由于永久的问题导致的,并且这种问题不能通过额外的重试来解决。比如常见的原因有软件bug或者无效的消息格式(例如,损坏(poison)的消息)。

作为架构师和开发者,我们应该问问自己:“对于消息处理故障,我们应该重试多少次?每一次重试之间我们应该等多久?我们怎样区分瞬时和持久故障?”

最重要的是:“所有重试都失败后或者遇到一个持久的故障,我们要做什么?”

当然,不同业务领域有不同的回答,消息系统一般会给我们提供工具让我们自己实现解决方案。

RabbitMQ会给我们提供诸如交付重试和死信交换器(DLX)来处理消息处理故障。

DLX的主要思路是根据合适的配置信息自动地把路由失败的消息发送到DLX,并且在交换器上根据规则来进一步的处理,比如异常重试,重试计数以及发送到“人为干预”的队列。

查看下面篇文章,它在RabbitMQ处理重试上提供了额外的可能模式视角。

链接:https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493

在RabbitMQ中我们需要记住最重要的事情是当一个消费者正在处理或者重试某个消息时(即使是在把它返回队列之前),其他消费者都可以并发的处理这个消息之后的其他消息。

当某个消费者在重试处理某条消息时,作为一个整体的消息处理逻辑不会被阻塞。所以,一个消费者可以同步地去重试处理一条消息,不管花费多长时间都不会影响整个系统的运行。

消费者1持续的在重试处理消息1,同时其他消费者可以继续处理其他消息

和RabbitMQ相反,Kafka没有提供这种开箱即用的机制。在Kafka中,需要我们自己在应用层提供和实现消息重试机制。

另外,我们需要注意的是当一个消费者正在同步地处理一个特定的消息时,那么同在这个分区上的其他消息是没法被处理的。

由于消费者不能改变消息的顺序,所以我们不能够拒绝和重试一个特定的消息以及提交一个在这个消息之后的消息。你只要记住,分区仅仅是一个追加模式的日志。

一个应用层解决方案可以把失败的消息提交到一个“重试主题”,并且从那个主题中处理重试;但是这样的话我们就会丢失消息的顺序。

我们可以在Uber.com上找到Uber工程师实现的一个例子。如果消息处理的时延不是关注点,那么对错误有足够监控的Kafka方案可能就足够了。

如果消费者阻塞在重试一个消息上,那么底部分区的消息就不会被处理

获胜者:

RabbitMQ是获胜者,因为它提供了一个解决这个问题的开箱即用的机制。

6、伸缩

有多个基准测试,用于检查RabbitMQ和Kafka的性能。

尽管通用的基准测试对一些特定的情况会有限制,但是Kafka通常被认为比RabbitMQ有更优越的性能。

Kafka使用顺序磁盘I / O来提高性能。

从Kafka使用分区的架构上看,它在横向扩展上会优于RabbitMQ,当然RabbitMQ在纵向扩展上会有更多的优势。

Kafka的大规模部署通常每秒可以处理数十万条消息,甚至每秒百万级别的消息。

过去,Pivotal记录了一个Kafka集群每秒处理一百万条消息的例子;但是,它是在一个有着30个节点集群上做的,并且这些消息负载被优化分散到多个队列和交换器上。

链接:https://content.pivotal.io/blog/rabbitmq-hits-one-million-messages-per-second-on-google-compute-engine

典型的RabbitMQ部署包含3到7个节点的集群,并且这些集群也不需要把负载分散到不同的队列上。这些典型的集群通常可以预期每秒处理几万条消息。

获胜者:

尽管这两个消息平台都可以处理大规模负载,但是Kafka在伸缩方面更优并且能够获得比RabbitMQ更高的吞吐量,因此这局Kafka获胜。

但是,值得注意的是大部分系统都还没有达到这些极限!所以,除非你正在构建下一个非常受欢迎的百万级用户软件系统,否则你不需要太关心伸缩性问题,毕竟这两个消息平台都可以工作的很好。

7、消费者复杂度

RabbitMQ使用的是智能代理和傻瓜式消费者模式。消费者注册到消费者队列,然后RabbitMQ把传进来的消息推送给消费者。RabbitMQ也有拉取(pull)API;不过,一般很少被使用。

RabbitMQ管理消息的分发以及队列上消息的移除(也可能转移到DLX)。消费者不需要考虑这块。

根据RabbitMQ结构的设计,当负载增加的时候,一个队列上的消费者组可以有效的从仅仅一个消费者扩展到多个消费者,并且不需要对系统做任何的改变。

RabbitMQ高效的伸缩

相反,Kafka使用的是傻瓜式代理和智能消费者模式。消费者组中的消费者需要协调他们之间的主题分区租约(以便一个具体的分区只由消费者组中一个消费者监听)。

消费者也需要去管理和存储他们分区偏移索引。幸运的是Kafka SDK已经为我们封装了,所以我们不需要自己管理。

另外,当我们有一个低负载时,单个消费者需要处理并且并行的管理多个分区,这在消费者端会消耗更多的资源。

当然,随着负载增加,我们只需要伸缩消费者组使其消费者的数量等于主题中分区的数量。这就需要我们配置Kafka增加额外的分区。

但是,随着负载再次降低,我们不能移除我们之前增加的分区,这需要给消费者增加更多的工作量。尽管这样,但是正如我们上面提到过,Kafka SDK已经帮我们做了这个额外的工作。

Kafka分区没法移除,向下伸缩后消费者会做更多的工作

获胜者:

根据设计,RabbitMQ就是为了傻瓜式消费者而构建的。所以这轮RabbitMQ获胜。

五、如何选择?

现在我们就如面对百万美元问题一样:“什么时候使用RabbitMQ以及什么时候使用Kafka?”概括上面的差异,我们不难得出下面的结论。

优先选择RabbitMQ的条件:

  • 高级灵活的路由规则;
  • 消息时序控制(控制消息过期或者消息延迟);
  • 高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或者持久);
  • 更简单的消费者实现。

优先选择Kafka的条件:

  • 严格的消息顺序;
  • 延长消息留存时间,包括过去消息重放的可能;
  • 传统解决方案无法满足的高伸缩能力。

大部分情况下这两个消息平台都可以满足我们的要求。但是,它取决于我们的架构师,他们会选择最合适的工具。当做决策的时候,我们需要考虑上面着重强调的功能性差异和非功能性限制。

这些限制如下:

  • 当前开发者对这两个消息平台的了解;
  • 托管云解决方案的可用性(如果适用);
  • 每种解决方案的运营成本;
  • 适用于我们目标栈的SDK的可用性。

当开发复杂的软件系统时,我们可能被诱导使用同一个消息平台去实现所有必须的消息用例。但是,从我的经验看,通常同时使用这两个消息平台能够带来更多的好处。

例如,在一个事件驱动的架构系统中,我们可以使用RabbitMQ在服务之间发送命令,并且使用Kafka实现业务事件通知。

原因是事件通知常常用于事件溯源,批量操作(ETL风格),或者审计目的,因此Kafka的消息留存能力就显得很有价值。

相反,命令一般需要在消费者端做额外处理,并且处理可以失败,所以需要高级的容错处理能力。

这里,RabbitMQ在功能上有很多闪光点。以后我可能会写一篇详细的文章来介绍,但是你必须记住–你的里程(mileage)可能会变化,因为适合性取决于你的特定需求。

六、总结思想

写这篇文章是由于我观察到许多开发者把这RabbitMQ和Kafka作为等价来看待。我希望通过这篇文章的帮助能够让你获得对这两种技术实现的深刻理解以及它们之间的技术差异。

反过来通过它们之间的差异来影响这两个平台去给用例提供更好的服务。这两个消息平台都很棒,并且都能够给多个用例提供很好的服务。

但是,作为解决方案架构师,取决于我们对每一个用例需求的理解,以及优化,然后选择最合适的解决方案。

RabbitMQ基础知识

1 MQ 存在的意义

消息中间件一般主要用来做 异步处理、应用解耦、流量削峰、日志处理 等方面。

1.1 异步处理

一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。
1. 串行方式,依次执行,问题是用户注册后就可以使用了,没必要等短信跟邮件啊。
2. 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟短信是非重要的任务,系统注册还要等这俩完成么?
3. 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟短信系统主动去消费数据。
异步处理

1.2 应用解耦

比如有一个订单系统,还要一个库存系统,用户下订单后要调用库存系统来处理,直接调用话,库存系统出现问题咋办呢?
应用解耦

1.3 流量削峰

举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间高密度访问绝对不可以,起码要加入一个MQ来实现削峰。
流量削峰

1.4 日志处理

用户通过 WebUI 访问发送请求到时候后端如何接受跟处理呢一般?
日志处理

1.5 MQ 带来的弊端

  1. 系统可用性降低:引入第三方依赖则需考虑第三方的稳定性。
  2. 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此需要考虑的东西更多,系统复杂性增大。

2 常见的 MQ

消息中间件具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ、炙手可热的Kafka、阿里巴巴自主开发RocketMQ等。

  1. ActiveMQ:老牌的消息中间件,但是不适合高并发互联网,适合传统企业。
  2. RabbitMQ:支持高并发、高吞吐、性能好,还有完善的管理界面等。支持集群化,缺点是Erlang语言开发的。
  3. RocketMQ:阿里出品,性能优越,Java开发,二次改造。
  4. Kafka:超高吞吐量实时日志采集,一般在大数据体系配合实时计算Spark Streaming、Flink等使用。
  5. 中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择。
  6. 大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

3 RabbitMQ 常见模式

RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP

AdvancedMessageQueuingProtocol:高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3.1 RabbitMQ 基本概念

RabbitMQ模型

  1. Broker:简单来说就是消息队列服务器实体
  2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  3. Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  4. Binding:它的作用就是把 exchange 和 queue 按照路由规则绑定起来
  5. Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
  6. VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
  7. Producer:消息生产者,就是投递消息的程序
  8. Consumer:消息消费者,就是接受消息的程序
  9. Channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务

由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯一的线路。

3.2 RabbitMQ 工作模式

3.2.1 simple模式

simple模式

生产者产生消息,将消息放入队列,消费者监听消息队列,如果队列中有消息就消费掉,消息被拿走后会自动从队列中删除,存在隐患需要消费者设置ACK确认,消费者处理完后要及时发送ack给队列,否则会造成内存溢出。

简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。

3.2.2 work工作模式(资源的竞争)

work工作模式

生产者将消息放入队列,消费者可以有多个。一般有两种模式:
1. 轮询分发(round-robin):MQ不管两个消费者谁忙,数据总是你一个我一个,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。
2. 公平分发:要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭,实现按照消费者性能消费。

3.2.3 fanout publish/subscribe 发布订阅模式

fanout模式

类似公众号的订阅跟发布,属于 fanout 模式,不处理路由键。不需要指定routingKey,我们只需要把队列绑定到交换机, 消息就会被发送到所有到队列中:

  1. 一个生产者多个消费者
  2. 每一个消费者都有一个自己的队列
  3. 生产者没有把消息直接发送到队列而是发送到了交换机转化器(exchange)。
  4. 每一个队列都要绑定到交换机上。
  5. 生产者发送的消息经过交换机到达队列,从而实现一个消息被多个消费者消费。
3.2.4 direct routing 路由模式

direct:处理路由键,需要指定routingKey,此时生产者发送数据到MQ的时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图
direct模式
缺点:路由key必须要明确,无法实现规则性模糊匹配。

3.2.5 topic 主题模式

将路由键跟某个模式匹配,生产者会带 routingKey,但是消费者的MQ会带模糊routingKey:
topic模式
1. # 表示匹配 >=1个字符。
2. * 表示匹配一个。
3. 路由功能添加模糊匹配。
4. 消息产生者产生消息,把消息交给交换机。
5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

3.2.6 总计

如果需要指定模式一般是在消费者端设置,灵活性调节。

4 常见考题

4.1 消息怎么路由的

生成者生产消息后消息带有 routing Key,通过routing Key 消费者队列被绑定到交换器上,消息到达交换器根据交换器规则匹配,常见交换器如下:
1. fanout:如果交换器收到消息,将会广播到所有绑定的队列上
2. direct:如果路由键完全匹配,消息就被投递到相应的队列
3. topic:可以使来自不同源头的消息能够到达同一个队列。使用 topic 交换器时,可以使用通配符

4.2 RabbitMQ 消息基于什么传输

信道是生产消费者与rabbit通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,就是说 RabbitMQ 在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ 都有唯一的ID来保证信道私有性,对应唯一的线程使用。用信道而不用 TCP 的原因是由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。

4.3 如何保证 RabbitMQ 消息不丢失

消息丢失主要分为 生产者丢失消息、消息列表丢失消息、消费者丢失消息。

4.3.1 生产者丢失消息

RabbitMQ 提供 transactionconfirm 模式来确保生产者不丢消息。
transaction 机制就是说:发送消息前,开启事务(channel.txSelect),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务channel.txCommit()。事务卡顿会导致后面无法发送,官方说加入事务机制MQ会降速250倍。

confirm(发送方确认模式)模式用的居多:一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个从1开始的唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个包含消息的唯一ID 的 ACK给生产者,这就使得生产者知道消息已经正确到达目的队列了,如果 RabbitMQ 没能处理该消息,则会发送一个 Nack (not acknowledged) 消息给你,你可以进行重试操作。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

4.3.2 消息列表 丢失消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,RabbitMQ 挂了后生产者收不到Ack信号,生产者会自动重发。

通过如下持久化设置,即使 RabbitMQ 挂了重启后也能恢复数据。
1. durable = true, 将 queue 的持久化设置为 true,则代表是一个持久的队列
2. 发送消息的时候将 deliveryMode=2

关于持久化其实是个权衡问题,持久化可能会导致系统QPS下降,所以一般仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的持久化不会导致系统性能瓶颈。

4.3.3 消费者丢失消息

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息。

解决方案:处理消息成功后,手动回复确认消息。消费者跟消息队列的连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息,保证数据的最终一致性。

注意点
1. 消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
2. 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者,这时可能存在消息重复消费的隐患,需要去重!

4.3 如何避免消息重复投递或重复消费

4.3.1 消息简介

消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。

消息重复消费的场景大概可以分为 生产者端重复消费和消费者端重复消费,解决办法是是通过幂等性来保证重复消费的消息不对结果产生影响即可。

  1. 消息生成时 RabbitMQ 内部 对每个生产的消息生成个 inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列。
  2. 消息消费时要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
  3. 在 RocketMQ 中生产者发送消息前询问 RocketMQ 信息是否已发送过,或者通过Redis记录已查询记录。不过最好的还是直接在消费端去重消费。

4.3.2 举例

  1. 消费者拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  2. 拿到消息后如果做的是 redis 的 set 操作就不用解决了,因为你无论set几次结果都是一样的。
  3. 准备个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

4.4 RabbitMQ 如何保证消息顺序执行

顺序性 必要性:生产者的信息是[插入、更新、删除],消费者执行顺序是[删除、插入、更新],这是跟预期不一致的。

4.4.1 乱序情况

出现消费乱序一般是如下两种情况:
1. 一个 queue,有多个 consumer 去消费,每个 consumer 的执行时间是不固定的,无法保证先读到消息的 consumer 一定先完成操作。
多个消费者乱序

  1. 一个 queue 对应一个 consumer,但是 consumer 里面进行了多线程消费,这样也会造成消息消费顺序错误。
    多线程乱序
4.4.2 解决乱序
  1. 拆分多个 queue,每个 queue 一个 consumer,将三个有先后顺序的消息根据用户订单id 哈希后发送到同一个queue中,来保证消息的先后性。当然这样会造成吞吐量下降。
    一个队列保证前后

  2. 一个 queue 对应一个 consumer,在 consumer 内部根据ID映射到不同内存队列,然后用内存队列做排队分发给底层不同的 worker 来处理
    内存队列实现顺序

4.5 RabbitMQ 的集群

RabbitMQ 是基于主从(非分布式)做高可用性的。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式

4.5.1 单机模式

单机版的 就是 Demo 级别,生产系统一般没人用单机模式。

4.5.2 普通集群模式

在 N 台机器上启动 N 个 RabbitMQ 实例。创建的 queue 只会放在一个 RabbitMQ 实例上,但每个MQ实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费时如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。让集群中多个节点来服务某个 queue 的读写操作来提高吞吐量。

4.5.3 镜像集群模式

RabbitMQ 的高可用模式,在镜像集群模式下,你创建的 queue无论元数据还是 queue 里的消息都会存在于多个实例上,每个 RabbitMQ 节点都有这个 queue 的全部数据的。写消息到 queue 的时候都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

  1. 优点在于任何一个机器宕机了其它节点还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。
  2. 缺点在于消息需要同步到所有机器上,导致网络带宽压力和消耗很重。也是每个节点都放这个 queue 的完整数据。

4.6 死信队列 跟 延迟队列

4.6.1 死信队列

死信 Dead Letter 是 RabbitMQ 中的一种消息机制,当消费消息时队列里的消息出现以下情况那么该消息将成为死信。死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃:

  1. 消息被否定确认,使用channel.basicNack 或 channel.basicReject ,并且此时 default-requeue-rejected(由于监听器抛出异常而拒绝的消息是否被重新放回队列) 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。
  1. 对队列中消息总数进行限制,x-max-length = 指定值。则超出阈值后队头数据被抛弃。
  2. 对队列中消息体总字节数进行限制,只计算消息体的字节数。x-max-length-bytes = 指定值。

死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机只不过是用来接受死信的普通交换机,所以可以为任何类型,比如Direct、Fanout、Topic。

适用场景

在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,在系统因为参数解析、数据校验、网咯拨打等导致异常后通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息。

死信消息的生命周期

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,明白死信队列运行机制后就知道这些 Exchange 和 Queue 想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

4.6.2 RabbitMQ 中的 TTL

TTL(Time To Live) 是 RabbitMQ 中一个 消息队列 的属性,如果一条消息设置了 TTL属性或者进入了有 TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为死信。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

  1. queue 设置 TTL
1Map<String, Object> args = new HashMap<String, Object>();
2args.put("x-message-ttl", 6000); // ms
3channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  1. Msg 设置 TTL
1AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
2builder.expiration("6000");
3AMQP.BasicProperties properties = builder.build();
4channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

区别
1. 设置了队列的TTL属性,一旦Msg 过期,就会被队列丢弃。
2. Msg 设置 TTL,Msg 是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的Msg 积压情况,则已过期的 Msg 也许还能存活较长时间,解决办法 安装插件 rabbitmq_delayed_message_exchange。
3. 如果不设置TTL,表示 Msg 永远不会过期,TTL = 0 表示除非此时可以直接投递该 Msg 到消费者,否则该 Msg 将会被丢弃。

4.6.3 延迟队列

延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。一般用在如下场景:

  1. 订单在 15 分钟之内未支付则自动取消。
  2. 账单在一周内未支付,则自动结算。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

    延时队列 = 死信队列 + TTL
    保证顺序性

  6. 当然也可以用 Java 的 DelayQueue、Quartz、Redis 的 zset 等实现。

4.7 MQ 消息积压咋办

这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下:

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
  2. 临时建立好原先10倍~20倍的queue数量(新建一个topic,partition是原来的10倍)。
  3. 然后写一个临时分发消息的 consumer 程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
  4. 紧接着征用10倍的机器来部署 consumer,每一批 consumer消费一个临时 queue 的消息。
  5. 这种做法相当于临时将 queue 资源和 consumer 资源扩大10倍,以正常速度的10倍来消费消息。
  6. 等快速消费完了之后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状。

    消息挤压处理

4.8 RabbitMQ 中的推拉

在RabbitMQ 中有推模式跟拉模式,平时开发多为推模式。

  1. 推模式:消息中间件主动将消息推送给消费者
  2. 拉模式:消费者主动从消息中间件拉取消息
4.8.1 推模式 push
  1. 推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止。当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。
  2. 推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。
  3. 由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
4.8.2 拉模式 pull
  1. 如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。
  2. 拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
  3. 由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。

4.9 设计个MQ

一般是个开放题,考察有没有从架构角度整体构思和设计的思维以及能力。不求看过源码起码但的知道基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好(强行为下篇Kafka做铺垫)。

  1. 考虑MQ的伸缩性,在需要的时候快速扩容来增加吞吐量和容量,设计个分布式的系统,参照一下kafka的设计理念,broker、 topic、 partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,给topic增加partition,然后做数据迁移,增加机器,提供更高的吞吐量了。
  2. 落磁盘方式为顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是Kafka的思路。
  3. 参考Kafka实现MQ高可用性,多副本 -> leader & follower -> broker 挂了重新选举leader即可对外服务。
  4. 参考前面的实现数据的零丢失