标签归档:RabbitMQ,消息中间件

RabbitMQ基础知识

1 MQ 存在的意义

消息中间件一般主要用来做 异步处理、应用解耦、流量削峰、日志处理 等方面。

1.1 异步处理

一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。
1. 串行方式,依次执行,问题是用户注册后就可以使用了,没必要等短信跟邮件啊。
2. 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟短信是非重要的任务,系统注册还要等这俩完成么?
3. 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟短信系统主动去消费数据。
异步处理

1.2 应用解耦

比如有一个订单系统,还要一个库存系统,用户下订单后要调用库存系统来处理,直接调用话,库存系统出现问题咋办呢?
应用解耦

1.3 流量削峰

举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间高密度访问绝对不可以,起码要加入一个MQ来实现削峰。
流量削峰

1.4 日志处理

用户通过 WebUI 访问发送请求到时候后端如何接受跟处理呢一般?
日志处理

1.5 MQ 带来的弊端

  1. 系统可用性降低:引入第三方依赖则需考虑第三方的稳定性。
  2. 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此需要考虑的东西更多,系统复杂性增大。

2 常见的 MQ

消息中间件具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ、炙手可热的Kafka、阿里巴巴自主开发RocketMQ等。

  1. ActiveMQ:老牌的消息中间件,但是不适合高并发互联网,适合传统企业。
  2. RabbitMQ:支持高并发、高吞吐、性能好,还有完善的管理界面等。支持集群化,缺点是Erlang语言开发的。
  3. RocketMQ:阿里出品,性能优越,Java开发,二次改造。
  4. Kafka:超高吞吐量实时日志采集,一般在大数据体系配合实时计算Spark Streaming、Flink等使用。
  5. 中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择。
  6. 大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

3 RabbitMQ 常见模式

RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP

AdvancedMessageQueuingProtocol:高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3.1 RabbitMQ 基本概念

RabbitMQ模型

  1. Broker:简单来说就是消息队列服务器实体
  2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  3. Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  4. Binding:它的作用就是把 exchange 和 queue 按照路由规则绑定起来
  5. Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
  6. VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
  7. Producer:消息生产者,就是投递消息的程序
  8. Consumer:消息消费者,就是接受消息的程序
  9. Channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务

由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯一的线路。

3.2 RabbitMQ 工作模式

3.2.1 simple模式

simple模式

生产者产生消息,将消息放入队列,消费者监听消息队列,如果队列中有消息就消费掉,消息被拿走后会自动从队列中删除,存在隐患需要消费者设置ACK确认,消费者处理完后要及时发送ack给队列,否则会造成内存溢出。

简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。

3.2.2 work工作模式(资源的竞争)

work工作模式

生产者将消息放入队列,消费者可以有多个。一般有两种模式:
1. 轮询分发(round-robin):MQ不管两个消费者谁忙,数据总是你一个我一个,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。
2. 公平分发:要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭,实现按照消费者性能消费。

3.2.3 fanout publish/subscribe 发布订阅模式

fanout模式

类似公众号的订阅跟发布,属于 fanout 模式,不处理路由键。不需要指定routingKey,我们只需要把队列绑定到交换机, 消息就会被发送到所有到队列中:

  1. 一个生产者多个消费者
  2. 每一个消费者都有一个自己的队列
  3. 生产者没有把消息直接发送到队列而是发送到了交换机转化器(exchange)。
  4. 每一个队列都要绑定到交换机上。
  5. 生产者发送的消息经过交换机到达队列,从而实现一个消息被多个消费者消费。
3.2.4 direct routing 路由模式

direct:处理路由键,需要指定routingKey,此时生产者发送数据到MQ的时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图
direct模式
缺点:路由key必须要明确,无法实现规则性模糊匹配。

3.2.5 topic 主题模式

将路由键跟某个模式匹配,生产者会带 routingKey,但是消费者的MQ会带模糊routingKey:
topic模式
1. # 表示匹配 >=1个字符。
2. * 表示匹配一个。
3. 路由功能添加模糊匹配。
4. 消息产生者产生消息,把消息交给交换机。
5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

3.2.6 总计

如果需要指定模式一般是在消费者端设置,灵活性调节。

4 常见考题

4.1 消息怎么路由的

生成者生产消息后消息带有 routing Key,通过routing Key 消费者队列被绑定到交换器上,消息到达交换器根据交换器规则匹配,常见交换器如下:
1. fanout:如果交换器收到消息,将会广播到所有绑定的队列上
2. direct:如果路由键完全匹配,消息就被投递到相应的队列
3. topic:可以使来自不同源头的消息能够到达同一个队列。使用 topic 交换器时,可以使用通配符

4.2 RabbitMQ 消息基于什么传输

信道是生产消费者与rabbit通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,就是说 RabbitMQ 在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ 都有唯一的ID来保证信道私有性,对应唯一的线程使用。用信道而不用 TCP 的原因是由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。

4.3 如何保证 RabbitMQ 消息不丢失

消息丢失主要分为 生产者丢失消息、消息列表丢失消息、消费者丢失消息。

4.3.1 生产者丢失消息

RabbitMQ 提供 transactionconfirm 模式来确保生产者不丢消息。
transaction 机制就是说:发送消息前,开启事务(channel.txSelect),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务channel.txCommit()。事务卡顿会导致后面无法发送,官方说加入事务机制MQ会降速250倍。

confirm(发送方确认模式)模式用的居多:一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个从1开始的唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个包含消息的唯一ID 的 ACK给生产者,这就使得生产者知道消息已经正确到达目的队列了,如果 RabbitMQ 没能处理该消息,则会发送一个 Nack (not acknowledged) 消息给你,你可以进行重试操作。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

4.3.2 消息列表 丢失消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,RabbitMQ 挂了后生产者收不到Ack信号,生产者会自动重发。

通过如下持久化设置,即使 RabbitMQ 挂了重启后也能恢复数据。
1. durable = true, 将 queue 的持久化设置为 true,则代表是一个持久的队列
2. 发送消息的时候将 deliveryMode=2

关于持久化其实是个权衡问题,持久化可能会导致系统QPS下降,所以一般仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的持久化不会导致系统性能瓶颈。

4.3.3 消费者丢失消息

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息。

解决方案:处理消息成功后,手动回复确认消息。消费者跟消息队列的连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息,保证数据的最终一致性。

注意点
1. 消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
2. 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者,这时可能存在消息重复消费的隐患,需要去重!

4.3 如何避免消息重复投递或重复消费

4.3.1 消息简介

消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。

消息重复消费的场景大概可以分为 生产者端重复消费和消费者端重复消费,解决办法是是通过幂等性来保证重复消费的消息不对结果产生影响即可。

  1. 消息生成时 RabbitMQ 内部 对每个生产的消息生成个 inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列。
  2. 消息消费时要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
  3. 在 RocketMQ 中生产者发送消息前询问 RocketMQ 信息是否已发送过,或者通过Redis记录已查询记录。不过最好的还是直接在消费端去重消费。

4.3.2 举例

  1. 消费者拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  2. 拿到消息后如果做的是 redis 的 set 操作就不用解决了,因为你无论set几次结果都是一样的。
  3. 准备个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

4.4 RabbitMQ 如何保证消息顺序执行

顺序性 必要性:生产者的信息是[插入、更新、删除],消费者执行顺序是[删除、插入、更新],这是跟预期不一致的。

4.4.1 乱序情况

出现消费乱序一般是如下两种情况:
1. 一个 queue,有多个 consumer 去消费,每个 consumer 的执行时间是不固定的,无法保证先读到消息的 consumer 一定先完成操作。
多个消费者乱序

  1. 一个 queue 对应一个 consumer,但是 consumer 里面进行了多线程消费,这样也会造成消息消费顺序错误。
    多线程乱序
4.4.2 解决乱序
  1. 拆分多个 queue,每个 queue 一个 consumer,将三个有先后顺序的消息根据用户订单id 哈希后发送到同一个queue中,来保证消息的先后性。当然这样会造成吞吐量下降。
    一个队列保证前后

  2. 一个 queue 对应一个 consumer,在 consumer 内部根据ID映射到不同内存队列,然后用内存队列做排队分发给底层不同的 worker 来处理
    内存队列实现顺序

4.5 RabbitMQ 的集群

RabbitMQ 是基于主从(非分布式)做高可用性的。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式

4.5.1 单机模式

单机版的 就是 Demo 级别,生产系统一般没人用单机模式。

4.5.2 普通集群模式

在 N 台机器上启动 N 个 RabbitMQ 实例。创建的 queue 只会放在一个 RabbitMQ 实例上,但每个MQ实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费时如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。让集群中多个节点来服务某个 queue 的读写操作来提高吞吐量。

4.5.3 镜像集群模式

RabbitMQ 的高可用模式,在镜像集群模式下,你创建的 queue无论元数据还是 queue 里的消息都会存在于多个实例上,每个 RabbitMQ 节点都有这个 queue 的全部数据的。写消息到 queue 的时候都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

  1. 优点在于任何一个机器宕机了其它节点还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。
  2. 缺点在于消息需要同步到所有机器上,导致网络带宽压力和消耗很重。也是每个节点都放这个 queue 的完整数据。

4.6 死信队列 跟 延迟队列

4.6.1 死信队列

死信 Dead Letter 是 RabbitMQ 中的一种消息机制,当消费消息时队列里的消息出现以下情况那么该消息将成为死信。死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃:

  1. 消息被否定确认,使用channel.basicNack 或 channel.basicReject ,并且此时 default-requeue-rejected(由于监听器抛出异常而拒绝的消息是否被重新放回队列) 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。
  1. 对队列中消息总数进行限制,x-max-length = 指定值。则超出阈值后队头数据被抛弃。
  2. 对队列中消息体总字节数进行限制,只计算消息体的字节数。x-max-length-bytes = 指定值。

死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机只不过是用来接受死信的普通交换机,所以可以为任何类型,比如Direct、Fanout、Topic。

适用场景

在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,在系统因为参数解析、数据校验、网咯拨打等导致异常后通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息。

死信消息的生命周期

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,明白死信队列运行机制后就知道这些 Exchange 和 Queue 想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

4.6.2 RabbitMQ 中的 TTL

TTL(Time To Live) 是 RabbitMQ 中一个 消息队列 的属性,如果一条消息设置了 TTL属性或者进入了有 TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为死信。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

  1. queue 设置 TTL
1Map<String, Object> args = new HashMap<String, Object>();
2args.put("x-message-ttl", 6000); // ms
3channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  1. Msg 设置 TTL
1AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
2builder.expiration("6000");
3AMQP.BasicProperties properties = builder.build();
4channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

区别
1. 设置了队列的TTL属性,一旦Msg 过期,就会被队列丢弃。
2. Msg 设置 TTL,Msg 是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的Msg 积压情况,则已过期的 Msg 也许还能存活较长时间,解决办法 安装插件 rabbitmq_delayed_message_exchange。
3. 如果不设置TTL,表示 Msg 永远不会过期,TTL = 0 表示除非此时可以直接投递该 Msg 到消费者,否则该 Msg 将会被丢弃。

4.6.3 延迟队列

延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。一般用在如下场景:

  1. 订单在 15 分钟之内未支付则自动取消。
  2. 账单在一周内未支付,则自动结算。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

    延时队列 = 死信队列 + TTL
    保证顺序性

  6. 当然也可以用 Java 的 DelayQueue、Quartz、Redis 的 zset 等实现。

4.7 MQ 消息积压咋办

这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下:

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
  2. 临时建立好原先10倍~20倍的queue数量(新建一个topic,partition是原来的10倍)。
  3. 然后写一个临时分发消息的 consumer 程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
  4. 紧接着征用10倍的机器来部署 consumer,每一批 consumer消费一个临时 queue 的消息。
  5. 这种做法相当于临时将 queue 资源和 consumer 资源扩大10倍,以正常速度的10倍来消费消息。
  6. 等快速消费完了之后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状。

    消息挤压处理

4.8 RabbitMQ 中的推拉

在RabbitMQ 中有推模式跟拉模式,平时开发多为推模式。

  1. 推模式:消息中间件主动将消息推送给消费者
  2. 拉模式:消费者主动从消息中间件拉取消息
4.8.1 推模式 push
  1. 推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止。当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。
  2. 推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。
  3. 由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
4.8.2 拉模式 pull
  1. 如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。
  2. 拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
  3. 由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。

4.9 设计个MQ

一般是个开放题,考察有没有从架构角度整体构思和设计的思维以及能力。不求看过源码起码但的知道基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好(强行为下篇Kafka做铺垫)。

  1. 考虑MQ的伸缩性,在需要的时候快速扩容来增加吞吐量和容量,设计个分布式的系统,参照一下kafka的设计理念,broker、 topic、 partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,给topic增加partition,然后做数据迁移,增加机器,提供更高的吞吐量了。
  2. 落磁盘方式为顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是Kafka的思路。
  3. 参考Kafka实现MQ高可用性,多副本 -> leader & follower -> broker 挂了重新选举leader即可对外服务。
  4. 参考前面的实现数据的零丢失