概述 业务中经常需要做某个操作, 然后一定时间之后看这个操作的执行结果。要么使用定时任务扫描,要么使用延时队列(任务)来实现。延时队列的核心是让消息在未来某个时间点才被消费。常用于如下场景:
发送延时消息(如订单未支付超时取消)
定时执行任务(如定时推送通知)
限流、分布式定时任务等
常见的实现方案 基于消息中间件:RabbitMQ与RocketMQ 基于RabbitMQ RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。
使用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
TTL 指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。当消息进入死信队列后可以使用下面两个参数进行重新路由:
x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。
x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。
基于RocketMQ RocketMQ原生支持延迟消息,可以在发送消息时指定延迟级别。正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费。RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息,预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h 。在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间。
在Broker端,会为每个固定的延时级别启动一个定时任务,轮询检查其对应的延时队列中的消息是否到期。一旦消息到期,就被重新构建并投递到其原始主题和队列,等待消费者消费。
工作流程
生产者发送消息时:在发送带有延时属性的消息时,实际上是发送一个普通消息,但生产者会设置消息属性中的 delayLevel 字段(延时级别)。
Broker端处理:Broker在收到消息后,会判断 delayLevel 是否大于0,如果消息是延时消息,Broker会 备份原始消息的topic和queueId 到消息属性中,然后将消息的主题修改为SCHEDULE_TOPIC_XXXX,队列ID改为delayLevel-1。消息最终会落盘到CommitLog文件,并复制到其他副本,保证可靠性。
后台定时任务调度:RocketMQ在Broker端会启动一个后台定时任务(ScheduleMessageService)。该任务为每个延时级别创建一个定时器,每秒(或其他固定时间间隔)执行一次拉取操作。它会根据每个延时级别的消费偏移量,从SCHEDULE_TOPIC_XXXX的对应队列中拉取消息。
消息恢复与投递:拉取到消息后,服务会根据消息的存储时间戳、物理偏移量等信息计算出消息的实际到期时间。当消息的到期时间到达时,服务会构建一条新的消息。在新消息中,清除延时属性,恢复原始的topic和queueId。最后,将这条恢复后的消息重新投递到目标主题的队列中,供消费者消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void execute () { DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name" ); producer.setNamesrvAddr("111.231.110.149:9876" ); producer.start(); for (int i = 0 ; i < 10 ; i++) { try { Message msg = new Message ("TopicTest" , "TagA" , ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.setDelayTimeLevel(3 ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 ); } }
基于 Redis 基于redis实现有如下三种方式:
Sorted Set (zset): 将任务的执行时间戳(作为 Score)和任务信息(作为 Value)存储在 zset 中。使用一个后台线程定时轮询 zset,取出最早到期的任务进行处理。
键空间通知(Keyspace Notifications): 为键设置过期时间,并监听 Redis 的键过期事件。当键过期时,触发回调函数处理延时任务。
Redisson 框架: Redisson 客户端提供了原生的延时队列支持,底层也是基于 Redis 的数据结构实现,使用更方便。
基于 Sorted Set (zset) 的实现 这是最主流、最可靠的 Redis 延时队列实现方式。流程如下:
将任务内容作为 member,将任务的执行时间戳(Unix timestamp)作为 score,使用 ZADD 命令将任务添加到 Sorted Set 中。
启动一个或多个后台线程/消费者,使用 ZRANGEBYSCORE 命令周期性地查询 Sorted Set,查找 score 小于等于当前时间戳的所有任务(即已到期的任务)。
1 2 ZADD delay_queue delay_time "任务ID" # 添加延时任务 ZRANGEBYSCORE delay_queue 0 now LIMIT 0 1 # 获取到期任务
注意:
并发竞争:在多线程或多进程环境下,可能存在多个消费者同时尝试处理同一个延迟消息的情况。虽然Redis的ZREM操作是原子的,但在实际的应用场景中,我们还需要确保任务处理逻辑的原子性。这通常可以通过在业务层加锁或者使用Redis的事务(multi/exec)来实现。对于大多数延迟队列的使用场景而言,直接在Redis层面处理并发已经足够,因为每个任务ID在队列中是唯一的,且ZREM会确保只移除一个元素。但在某些复杂场景下,如任务需要基于其他数据状态来决策是否执行时,就需要在业务逻辑层面加锁了。
容错处理:
Redis持久化:确保Redis配置了合适的持久化策略(如RDB或AOF),以防止系统崩溃导致的数据丢失。
失败重试机制:消费者程序应该具备重试机制,当处理任务失败时,能够将任务重新加入队列等待再次处理。
监控与告警:监控Redis服务器的性能指标(如内存使用率、CPU使用率、网络延迟等),并设置相应的告警阈值,以便及时发现并解决问题。
性能优化:
批量处理:可以通过增加每次查询的时间范围(即增加zrangeByScoreWithScores的max参数),来一次性处理多个即将到期的任务,减少Redis的访问次数,提高性能。
减少网络开销:使用Redis的pipeline特性,将多个命令打包发送到Redis服务器,减少网络往返时间(RTT)。
分桶策略 : 对于大量任务,可以按照时间范围创建多个 Sorted Set,减少单个集合中的元素数量。
使用 Redis Lua 脚本 : 确保获取和删除任务的原子性。
基于键空间通知 (Keyspace Notifications) 的实现 这种方式利用了 Redis 的过期事件机制。开启键空间通知需要在 Redis 服务器配置中开启键空间通知功能,设置 notify-keyspace-events Ex,表示监听键过期事件。流程如下:
入队: 将任务内容作为 value,设置一个唯一的 key(如 task:{orderId}),并使用 EXPIRE 或 SETEX 命令给 key 设置一个过期时间,即延时时间。
消费: 客户端订阅 Redis 的 keyevent@* :expired 或特定数据库的 keyevent@ :expired 频道。当 key 过期时,Redis 会发布一个过期事件消息到该频道,订阅的消费者收到消息后即可执行相应的任务。
keyspace notifications 键空间通知对应的channel被分为两类:
以__keyspace@<db>__: 为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。如 __keyspace@<db>__:order_id,就是表示当 order_id这个key过期时,消费者会收到这个key过期的消息。
以__keyevent@<db>__: 为前缀,后面跟的是消息事件类型,表示监听某个事件。如__keyspace@<db>__:expired,表示监听一个过期事件。
注意:
key过期机制问题: key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略:
惰性清除。当这个key过期之后,访问时,这个Key才会被清除
定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除
key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。只要两种清除策略都不满足,没人访问需要过期的key,后台的定时清理的任务也没扫描到要过期key,那么就不会发布key过期的事件,自然而然也就监听不到了,会造成一定时间消息的延迟。
丢消息问题:Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息。
消息消费只有广播模式:Redis的发布订阅模式消息消费只有广播模式一种。 如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。
总之:Redis keyspace notifications不保证事件的实时性和可靠性(事件可能会丢失,尤其是在主从切换或网络分区时)。Pub/Sub 机制没有消息确认(ACK)机制,消费者接收到消息后处理失败,消息就丢失了,不适合高可靠性要求的场景。
基于 Redisson 客户端 RDelayedQueue 的实现 Redisson 的 RDelayedQueue 实际上是基于 Sorted Set(zset)实现的封装。它抽象了底层的 ZSet 操作(如 ZADD、ZRANGEBYSCORE、ZREM),使得可以像操作普通 Java 队列一样使用延时队列。
Redisson 定期使用 zrangebyscore 命令扫描 SortedSet 中过期的元素,然后将这些过期元素从 SortedSet 中移除,并将它们加入到就绪消息列表中。就绪消息列表是一个阻塞队列,有消息进入就会被消费者监听到。这样做可以避免消费者对整个 SortedSet 进行轮询,提高了执行效率。相比于 Redis 过期事件监听实现延时任务功能,这种方式具备下面这些优势:
减少了丢消息的可能:DelayedQueue 中的消息会被持久化,即使 Redis 宕机了,根据持久化机制,也只可能丢失一点消息,影响不大。当然了,你也可以使用扫描数据库的方法作为补偿机制。
消息不存在重复消费问题:每个客户端都是从同一个目标队列中获取任务的,不存在重复消费的问题
跟 Redisson 内置的延时队列相比,消息队列可以通过保障消息消费的可靠性、控制消息生产者和消费者的数量等手段来实现更高的吞吐量和更强的可靠性,实际项目中首选使用消息队列的延时消息这种方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class DelayQueueService { @Autowired private RedissonClient redissonClient; private static final String DELAY_QUEUE_KEY = "my:delay:queue" ; public void addDelayTask (String message, long delay, TimeUnit timeUnit) { DelayedMessage delayedMessage = new DelayedMessage (); delayedMessage.setId(UUID.randomUUID().toString()); delayedMessage.setBody(message); delayedMessage.setCreateTime(LocalDateTime.now()); delayedMessage.setExecuteTime(LocalDateTime.now().plus(delay, toChronoUnit(timeUnit))); RDelayedQueue<DelayedMessage> delayedQueue = redissonClient.getDelayedQueue(=redissonClient.getBlockingQueue(DELAY_QUEUE_KEY)); delayedQueue.offer(delayedMessage, delay, timeUnit); log.info("添加延时任务成功,任务ID:{},延迟:{} {}" , delayedMessage.getId(), delay, timeUnit); } private ChronoUnit toChronoUnit (TimeUnit timeUnit) { switch (timeUnit) { case DAYS: return ChronoUnit.DAYS; case HOURS: return ChronoUnit.HOURS; case MINUTES: return ChronoUnit.MINUTES; case SECONDS: return ChronoUnit.SECONDS; case MILLISECONDS: return ChronoUnit.MILLIS; case MICROSECONDS: return ChronoUnit.MICROS; case NANOSECONDS: return ChronoUnit.NANOS; default : throw new IllegalArgumentException ("Unsupported time unit: " + timeUnit); } } }
消费:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class DelayQueueConsumer implements InitializingBean { @Autowired private RedissonClient redissonClient; private static final String DELAY_QUEUE_KEY = "delay:queue" ; @Override public void afterPropertiesSet () { new Thread (this ::consumeDelayMessage).start(); } private void consumeDelayMessage () { RBlockingQueue<DelayedMessage> blockingQueue = redissonClient.getBlockingQueue(DELAY_QUEUE_KEY); RDelayedQueue<DelayedMessage> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); while (!Thread.currentThread().isInterrupted()) { try { DelayedMessage message = blockingQueue.take(); log.info("消费延时消息:{}" , message); processMessage(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("消费延时消息被中断" , e); } catch (Exception e) { log.error("消费延时消息出错" , e); } } delayedQueue.destroy(); } private void processMessage (DelayedMessage message) { log.info("处理消息,ID: {}, 内容: {}" , message.getId(), message.getBody()); } }
总结:对于生产环境中的分布式延时任务,基于 ZSet 的手动实现或使用功能更强大的 Redisson RDelayedQueue 是更可靠的选择。基于键空间通知的方式实现简单,但可靠性较差,仅适用于对延时任务丢失不敏感的非核心业务。
基于时间轮算法 (Time Wheel) 时间轮是一种高效实现大量延时任务的算法,被 Netty、Kafka 等框架采用。拟时钟,将时间划分为多个槽位(Slot),每个槽位代表一个时间刻度。任务根据其到期时间被放置到相应槽位的链表中。通过一个指针周期性地移动,执行当前槽位的任务。对于超出一轮周期的任务,会增加一个轮数属性,待指针再次经过时判断是否执行。
基于 JDK (DelayQueue) Java 开发中可以直接使用 java.util.concurrent.DelayQueue。DelayQueue 是一个基于优先级队列(PriorityQueue)实现的无界阻塞队列,只有当元素的延迟时间到期时才能从队列中取出元素。任务需要实现 Delayed 接口。使用简单、集成在 JDK 中、无需额外依赖、性能较好。但是只能在单机、单个 JVM 实例内使用,不支持分布式环境。
实践 定义:延迟任务数据模型
1 2 3 4 5 public record DelayTask <T>(String bizType, T payload, long delayTime, long expireTime) implements Serializable { @Serial private static final long serialVersionUID = -8097910962846184246L ; }
定义: 执行器接口,执行器创建可以考虑使用 Factory 创建。
1 2 3 4 public interface DelayQueueExecutor { void execute () ; }
JDK DelayQueue DelayQueue是JDK提供的API,是一个内置的延迟队列。
定义任务模型 :在使用DelayQueue实现延时任务,可以通过实现Delayed接口来定义任务数据模型,同时需要实现下面两个方法:
getDelay方法返回这个任务还剩多久时间可以执行,小于0的时候说明可以这个延迟任务到了执行的时间了。
compareTo这个是对任务排序的,保证最先到延迟时间的任务排到队列的头。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public record JdkDelayElement <T>(DelayTask<T> task) implements Delayed { @Override public long getDelay (@NotNull TimeUnit unit) { return unit.convert(task.expireTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo (@NotNull Delayed o) { if (this .getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1 ; } else if (this .getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1 ; } return 0 ; } }
jdk延迟任务执行实现(如果考虑扩展,可以增加一个抽象类 AbstractJdkDelayQueueExecutor, 增加 bizHandle(T obj) 处理具体的业务):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class JdkDelayQueueExecutor implements DelayQueueExecutor { private static final Logger logger = LoggerFactory.getLogger(JdkDelayQueueExecutor.class); private final DelayQueue<JdkDelayElement<OrderInfo>> queue; public JdkDelayQueueExecutor (DelayQueue<JdkDelayElement<OrderInfo>> queue) { this .queue = queue; } @Override public void execute () { while (!queue.isEmpty()) { try { JdkDelayElement<OrderInfo> element = queue.take(); DelayTask<OrderInfo> delayTask = element.task(); OrderInfo orderInfo = delayTask.payload(); logger.info("orderId={}, userId={}, delayTime={}, expireTime={}" , orderInfo.getOrderId(), orderInfo.getUserId(), delayTask.delayTime(), delayTask.expireTime()); } catch (InterruptedException e) { logger.error("jdk delay task execute failed: " , e); } } } }
take()方法获取任务的时候,会拿到队列头部的元素,也就是队列中最早需要被执行的任务,通过getDelay返回值判断任务是否需要被立刻执行,如果需要的话,就返回任务,如果不需要就会等待这个任务到延迟时间的剩余时间,当时间到了就会将任务返回。
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class JdkDelayQueueExample { public static void main (String[] args) { OrderInfo o1 = new OrderInfo (10000L , 10000L , 100000L , "o1" ); OrderInfo o2 = new OrderInfo (10000L , 10000L , 100000L , "o2" ); OrderInfo o3 = new OrderInfo (10000L , 10000L , 100000L , "o3" ); DelayTask<OrderInfo> d1 = new DelayTask <>("order-service" , o1, 5 * 1000L , System.currentTimeMillis() + 5 * 1000L ); DelayTask<OrderInfo> d2 = new DelayTask <>("order-service" , o2, 5 * 1000L , System.currentTimeMillis() + 5 * 1000L ); DelayTask<OrderInfo> d3 = new DelayTask <>("order-service" , o3, 5 * 1000L , System.currentTimeMillis() + 5 * 1000L ); DelayQueue<JdkDelayElement<OrderInfo>> queue = new DelayQueue <>(); queue.put(new JdkDelayElement <>(d1)); queue.put(new JdkDelayElement <>(d2)); queue.put(new JdkDelayElement <>(d3)); JdkDelayQueueExecutor executor = new JdkDelayQueueExecutor (queue); executor.execute(); } }
put()/offer()方法在提交任务的时候,会通过根据compareTo的实现对任务进行排序,将最先需要被执行的任务放到队列头。
基于redis实现 实现抽象执行器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public abstract class AbstractRedisDelayQueueExecutor <T> implements DelayQueueExecutor { private static final Logger logger = LoggerFactory.getLogger( AbstractRedisDelayQueueExecutor.class); private static final String KEY_PREFIX = "order:delay" ; private final StringRedisTemplate redisTemplate; private final JsonMapper jsonMapper; protected AbstractRedisDelayQueueExecutor (StringRedisTemplate redisTemplate, JsonMapper jsonMapper) { this .redisTemplate = redisTemplate; this .jsonMapper = jsonMapper; } @Override @SuppressWarnings("unchecked") public void execute () { while (true ) { long now = System.currentTimeMillis(); Set<TypedTuple<String>> delays = redisTemplate.opsForZSet() .rangeByScoreWithScores(KEY_PREFIX, 0 , now, 0 , 10 ); if (delays == null || delays.isEmpty()) { continue ; } for (TypedTuple<String> delay : delays) { String strTaskValue = delay.getValue(); if (strTaskValue == null || strTaskValue.isEmpty()) { continue ; } try { DelayTask<? extends T > biz = jsonMapper.readValue(strTaskValue, DelayTask.class); bizHandle(biz.payload()); redisTemplate.opsForZSet().remove(KEY_PREFIX, strTaskValue); } catch (JsonProcessingException e) { logger.error("biz data to json failed: " , e); } } } } protected abstract void bizHandle (T obj) ; }
具体业务执行器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class OrderDelayTaskExecutor extends AbstractRedisDelayQueueExecutor <OrderInfo> { private static final Logger logger = LoggerFactory.getLogger(OrderDelayTaskExecutor.class); public OrderDelayTaskExecutor (StringRedisTemplate redisTemplate, JsonMapper jsonMapper) { super (redisTemplate, jsonMapper); } @Override protected void bizHandle (OrderInfo obj) { logger.info("order delay handle, orderId={}, userId={}, skuId={}" , obj.getOrderId(), obj.getUserId(), obj.getSkuId()); } }