MQ 的问题
任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。
MQ 主要引入了以下问题:
系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。
系统复杂度提高
使用 MQ,需要关注一些新的问题:
- 如何保证消息没有 重复消费?
- 如何处理 消息丢失 的问题?
- 如何保证传递 消息的顺序性?
- 如何处理大量 消息积压 的问题?
一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?
下面,我们针对以上问题来一一分析。
1. 重复消费
如何保证消息不被重复消费 和 如何保证消息消费的幂等性 是同一个问题。
必须先明确产生重复消费的原因,才能对症下药。
重复消费问题原因
重复消费问题通常不是 MQ 来处理,而是由开发来处理的。
以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。
Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。
重复消费解决方案
应对重复消费问题,需要在业务层面,通过 幂等性设计 来解决。
MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:
- 如果是写关系型数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;
- 如果是写 Redis,由于 set 操作天然具有幂等性,所以什么都不用做;
- 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。
在实际开发中,可以参考上面的例子,结合现实场景,设计合理的幂等性方案。
2. 消息丢失
如何处理消息丢失的问题 和 如何保证消息不被重复消费 是同一个问题。关注点有:
- MQ Server 丢失数据
- 消费方丢失数据
- 生产方丢失数据
消费方丢失数据
唯一可能导致消费方丢失数据的情况是:消费方设置了自动提交 Offset。一旦设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。
解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。
RabbitMq弄丢了数据
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。 - 第二个是发送消息的时候将消息的
deliveryMode设置为 2
就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。
生产方丢失数据
如果按照上述的思路设置了 acks=all,生产方一定不会丢数据。
要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。
3. 消息的顺序性
要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。
方案一
一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
方案二
- 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。
- 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:
- 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;
- 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。

4. 消息积压
假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。
对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:
- 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。
- 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。
- 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。
- 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。







