SpringBoot使用Redis实现消息队列的方法小结

  package cn.springdoc.demo.consumer;

  import java.util.concurrent.TimeUnit;

  import org.slf4j.Logger;

  import org.slf4j.LoggerFactory;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.boot.ApplicationArguments;

  import org.springframework.boot.ApplicationRunner;

  import org.springframework.data.redis.core.StringRedisTemplate;

  import org.springframework.stereotype.Component;

  @Component

  public class OrderConsumer implements ApplicationRunner, Runnable {

  static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

  // 消息队列

  final String queue = "queue_orders";

  // pending 队列,即待确认消息的队列

  final String pendingQueue = "pending_queue_orders";

  @Autowired

  StringRedisTemplate stringRedisTemplate;

  @Override

  public void run(ApplicationArguments args) throws Exception {

  // 应用启动后,创建新的线程来执行消费任务

  Thread thread = new Thread(this);

  thread.setName("order-consumer-thread");

  thread.start();

  }

  @Override

  public void run() {

  while (true) {

  try {

  // 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的

  // 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null

  String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);

  if (item == null) {

  log.info("等待消息 ...");

  continue ;

  }

  try {

  // 2:解析为 Long

  Long orderId = Long.parseLong(item);

  // 模拟消息消费

  log.info("消费消息: {}", orderId);

  } catch (Exception e) {

  log.error("消费异常:{}", e.getMessage());

  continue;

  }

  // 3:消费成功,从 pending 队列删除记录,相当于确认消费

  stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);

  } catch (Exception e) {

  log.error("队列监听异常:{}", e.getMessage());

  break;

  }

  }

  log.info("退出消费");

  }

  }