RabbitMQ消费幂等性

什么是幂等性

幂等性,简单来说就是对于同一个系统,在同样条件下,一次请求和重复多次请求对资源的影响是一致的,就称该操作为幂等的。比如说如果有一个接口是幂等的,当传入相同条件时,其效果必须是相同的。在RabbitMQ中消费幂等就是指给消费者发送多条同样的消息,消费者只会消费其中的一条。例如,在一次购物中提交订单进行支付时,当网络延迟等其他问题造成消费者重新支付,如果没有幂等性的支持,那么会对同一订单进行两次扣款,这是非常严重的,因此有了幂等性,当对同一个订单进行多次支付时,可以确保只对同一个订单扣款一次。

RabbitMQ消费幂等性

在正常情况下,消费者在消费消息的时候,当消费完毕后,会发送一个确认ack给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。而在前面保证生产端消息可靠性投递方案1中,当生产者发送消息给RabbitMQ后,在Broker返回确认ack之前,RabbitMQ出现了宕机(数据库保存的消息状态仍然为“投递中”),则该消息会被定时任务抓取并重新发送;或者当在网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行RabbitMQ重试补偿,那么此时RabbitMQ中就可能会有两条消息,会造成消费者重复消费,此时消费端就需要做幂等性校验,让消费者只消费其中一条消息。实现消费端幂等性、保证同一消息不被重复消费下面介绍一种简单的方案。

  • 为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个唯一的id,流程如下:

    1、消费者监听到消息后获取消息的MsgId(这个MsgId是我们自定义消息的字段,是主键),先去Redis中查询这个MsgId是否存在。也可以生产者发送消息时指给消息对象设置唯一的 MessageID,只有该 MessageID 没有被消费者存入到Redis中即该消息未被消费,这样重发的消息才能在重试机制中再次被消费。

    2、如果不存在,则正常消费消息,并把消息的id存入Redis中。

    3、如果存在则丢弃或者拒绝此消息并不返回队列。

  • 代码:

    1.消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @RabbitListener(queues = "queue1")
    public void getMessageFromQueue1(Channel channel, Message message) throws IOException {
    SetOperations<String, Object> ops = redisTemplate.opsForSet();
    //获取唯一Id
    String msgID = message.getMessageProperties().getHeader(
    "spring_returned_message_correlation");

    try{
    if(ops.pop(msgID)!=null) {
    //该订单已经完成扣款,无需再进行扣款
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    return ;

    }
    //执行扣款操作
    ..
    ..

    //将对应的订单id保存到redis中
    ops.add(msgID,"ok");

    //返回确认ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }catch(Exception e){

    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

    }

    }

消息队列如何限流?

消息队列限流是指在服务器面临巨额流量时,为了进行自保,进行的一种救急措施。

因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。

所以要进行一次降级操作,把处理不了的流量隔绝在系统之外,避免它们打垮系统。

基本上任何一个消息队列都有限流的功能,今天我们就来看看在RabbitMQ之中进行限流具体应该怎么做?

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息还未被消费确认,则不进行新消息的消费。

TTL消息/队列

TTL是Time To Live的缩写,也就是生存时间的意思,RabbitMQ支持消息的过期时间,在消息发送时可以进行指定,也支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

设置队列的话就是整个队列的消息到时都会过期,设置消息的话就是单条消息到时自动过期。

1
2
3
4
5
6
7
8
9
// TTL队列示例
@Bean
public Queue ttlQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置3s过期
arguments.put("x-message-ttl",3000);
return new Queue("topicQueue1",false,false,false, arguments);
}

上面的代码就是演示如何创建一个TTL队列,需要放入参数才行,队列构造中的其他参数我为了方便直接填了false。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendTtl() {
String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();

System.out.println("Message content : " + message);

// 设置过期3s
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setExpiration("3000").build();

rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
System.out.println("消息发送完毕。");
}

DLX死信队列

DLX死信队列虽然叫队列,但其实指的是Exchange,或者说指的Exchange和它所属的Queue,他俩一块构成了死信队列。

当一条消息:

  • 消费被拒绝(basic.reject/basic.nack)并且requeue=false
  • TTL过期
  • 要进入的队列达到最大长度

这三种情况,就可以判定一条消息死了,这种消息如果我们没有做处理,它就会被自动删除。

但其实我们可以在队列上加上一个参数,使当队列中发现了死亡的消息之后会将它自动转发到某个Exchange,由指定的Exchange来处理这些死亡的消息。

这个处理死亡消息的Exchange和之前我们讲述的Exchange没什么区别,依然可以绑定队列然后进行消息消费。

1
2
3
4
5
6
7
8
9
// DLX队列示例
@Bean
public Queue dlxQueue() {
Map<String, Object> arguments = new HashMap<>();
// 指定消息死亡后发送到ExchangeName="dlx.exchange"的交换机去
arguments.put("x-dead-letter-exchange","dlx.exchange");
return new Queue("topicQueue1", false, false, false, arguments);
}

如上代码,就是设置了一个队列中的消息死亡后的去处,就等于消息死亡后给它不把它删掉而是做一次转发,发到其他Exchange去。

那这样搞有什么用呢?这就取决于业务需求了,不过下一节会用到它,接着往下看~

延时队列

RabbitMQ的基因中没有延时队列这回事,它不能直接指定一个队列类型为延时队列,然后去延时处理,但是经过上面两节的铺垫,我们可以将TTL+DLX相结合,这就能组成一个延时队列。

设想一个场景,下完订单之后15分钟未付款我们就要将订单关闭,这就是一个很经典的演示消费的场景,如果拿RabbitMQ来做,我们就需要结合TTL+DLX了。

先把订单消息设置好15分钟过期时间,然后过期后队列将消息转发给我们设置好的DLX-ExchangeDLX-Exchange再将分发给它绑定的队列,我们的消费者再消费这个队列中的消息,就做到了延时十五分钟消费。