SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

开发项目中,有的服务对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。这些我们通常都是使用SpringQuartz 、springboot下使用Schedule然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,例如 编写文本时候定时几分钟保存一次的草稿。过期取消订单
对于这些开始时间不确定的定时任务,可以通过Spring Cloud Stream来很好的处理。延迟消息使用RabbitMQ中提供了关于延迟消息的插件。
安装RabbitMQ
https://www.rabbitmq.com/download.html
SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
文章图片

rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rabbit MQ 是建立在Erlang OTP平台上。
需要先安装erlang 安装过程会提示下载 SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
文章图片
进行安装
在安装好RabbitMQ 进行环境变量配置,新建RABBITMQ_SERVER路径 D:\Program Files\RabbitMQ3.7.8\rabbitmq_server-3.7.8\sbin
可视化界面启动

  1. 运行命令行窗口cmd
  2. 输入命令rabbitmq-plugins enable rabbitmq_management,这样就可以添加可视化插件了。
  3. SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
    文章图片
只要启动rabbitmq,然后在浏览器输入 http://127.0.0.1:15672/ 就可以访问了
用户名密码 guest
SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
文章图片

http://www.rabbitmq.com/community-plugins.html 中找到rabbitmq_delayed_message_exchange插件只有RabbitMQ 3.6.x以上才支持
在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。
然后通过命令行启用该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

pom

org.springframework.boot
spring-boot-starter-parent
2.0.5.RELEASE



UTF-8
1.8



org.projectlombok
lombok
1.18.2


org.springframework.boot
spring-boot-starter-test
test


org.springframework.cloud
spring-cloud-starter-stream-rabbit


org.springframework.boot
spring-boot-starter-actuator


junit
junit
3.8.1
test





org.springframework.cloud
spring-cloud-dependencies
Finchley.SR1
pom
import



配置文件
spring.application.name=stream-delayed-message
server.port=18081
spring.cloud.stream.bindings.example-topic-input.destination = delay-topic
spring.cloud.stream.bindings.example-topic-input.group=test
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true

spring.cloud.stream.bindings.example-topic-output.destination=delay-topic
spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true
用来开启延迟消息的功能,这样在创建exchange的时候,会将其设置为具有延迟特性的exchange,也就是用到上面我们安装的延迟消息插件的功能
interface Topic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}

@Slf4j
@RestController
public class Controller {
@Autowired
private Topic topic;
/**
* 消息生产接口
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
log.info("Send: "+message);
topic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay",5000).build());
return "ok";
}
}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
publiclass TestListener {
@StreamListener(Topic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
}
}
跑起来 http://localhost:18081/sendMessage?message=hello
【SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)】SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
文章图片

    推荐阅读