什么是延时队列,延时队列应用于什么场景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景
- 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
- 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
- 系统中的业务失败之后,需要重试
这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。
Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
第二种方式:利用rabbitmq中的插件x-delay-message
利用TTL DLX实现延时队列的方式
TTL DLX是什么
TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
Springboot集成rabbitmq实现第一种方式
在pom.xml文件中增加rabbitmq的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
初始化queue exchange和queue及exchange之间的binding关系
Config.java
package com.example.demo.deadLetter; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.example.demo.Constants.Constants; @Configuration public class Config { // 创建一个立即消费队列 @Bean public Queue immediateQueue() { // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化 return new Queue(Constants.IMMEDIATE_QUEUE, true); } // 创建一个延时队列 @Bean public Queue delayQueue() { Map<String, Object> params = n<strong>本文来源gao@daima#com搞(%代@#码@网2</strong>ew HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY); return new Queue(Constants.DELAY_QUEUE, true, false, false, params); } @Bean public DirectExchange immediateExchange() { // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除, //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数 return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false); } @Bean public DirectExchange deadLetterExchange() { // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除, //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数 return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false); } @Bean //把立即消费的队列和立即消费的exchange绑定在一起 public Binding immediateBinding() { return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY); } @Bean //把立即消费的队列和立即消费的exchange绑定在一起 public Binding delayBinding() { return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY); } }