Redisson是一个基于Redis的Java驻留库,旨在简化Java应用程序对Redis的操作。它提供了丰富的功能和易于使用的API,使得在Java应用中集成Redis变得更加简单和高效。Redisson的主要功能包括分布式对象、分布式锁、分布式队列、分布式调度器等,使得在分布式环境中进行协作变得更加容易。
延迟队列是Redisson提供的一个功能强大的组件之一。它允许开发者在指定的延迟时间之后自动执行任务,这对于需要在一定时间后执行操作的应用程序非常有用,如消息通知、订单处理、定时任务等。
Redisson实现延迟队列的原理基于Redis的有序集合(Sorted Set)和Redis的过期时间机制
生产者
因为RDelayedQueue并不直接存储任务。相反,它是一种装饰器或包装器,用于添加延迟功能到一个标准的Redisson队列(如RBlockingQueue)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | @Slf4j @Component public class RedissonDelayQueueProducer {     @Autowired     private RedissonClient redissonClient;
      public void addDelayedTask(String topic, Object task, long delay, TimeUnit timeUnit) {         log.info("redis的延迟队列 {} (添加)的key:{},time:{},当前时间:{}", topic, task, delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));         redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).offerAsync(task, delay, timeUnit);     }
      public Boolean cancelDelayedTask(String topic, Object task) {         log.info("redis的延迟队列{} (移除)的key:{},当前时间:{}", topic, task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));                  return redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).remove(task);     } }
 
  | 
 
消费者
起一个线程循环监听延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | @Slf4j @Service public class RedissonDelayedQueueConsumer {     private final static String topic = "topic";     @Autowired     private RedissonClient redissonClient;
      @PostConstruct     public void init() {                  log.info("{} 开始监听", topic);         new Thread(() -> {             while (true) {                 try {                     RBlockingQueue<Object> aaa = redissonClient.getBlockingQueue(topic);                     RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(aaa);                                          Object take = aaa.take();                                          log.info("redis的延迟队列:{},当前时间:{}", take, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));                 } catch (Exception e) {                     log.error("redis的延迟队列抛出异常", e);                 }             }         }).start();     } }
 
 
  | 
 
效果展示

redisson配置
1 2 3 4 5 6 7 8 9
   | <dependency>   <groupId>org.redisson</groupId>   <artifactId>redisson</artifactId>   <version>3.17.4</version> </dependency> <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | @Configuration public class RedissonConfig {
      @Value("${spring.data.redis.host}")     private String redisHost;     @Value("${spring.data.redis.port}")     private String redisPort;     @Value("${spring.data.redis.password}")     private String password;
      @Bean(destroyMethod = "shutdown")     public RedissonClient redissonClient() {         Config config = new Config();         config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);         if(StringUtils.isBlank(password)) {             config.useSingleServer().setPassword(null);         } else {             config.useSingleServer().setPassword(password);         }         return Redisson.create(config);     } }
 
  |