开发项目中,有的服务对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。这些我们通常都是使用SpringQuartz 、springboot下使用Schedule然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,例如 编写文本时候定时几分钟保存一次的草稿。过期取消订单
对于这些开始时间不确定的定时任务,可以通过Spring Cloud Stream来很好的处理。延迟消息使用RabbitMQ中提供了关于延迟消息的插件。
安装RabbitMQ
https://www.rabbitmq.com/download.html
文章图片
rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rabbit MQ 是建立在Erlang OTP平台上。
需要先安装erlang 安装过程会提示下载
文章图片
进行安装
在安装好RabbitMQ 进行环境变量配置,新建RABBITMQ_SERVER路径 D:\Program Files\RabbitMQ3.7.8\rabbitmq_server-3.7.8\sbin
可视化界面启动
- 运行命令行窗口cmd
- 输入命令rabbitmq-plugins enable rabbitmq_management,这样就可以添加可视化插件了。
-
文章图片
用户名密码 guest
文章图片
http://www.rabbitmq.com/community-plugins.html 中找到
rabbitmq_delayed_message_exchange
插件只有RabbitMQ 3.6.x以上才支持在下载好之后,解压得到
.ez
结尾的插件包,将其复制到RabbitMQ安装目录下的plugins
文件夹。然后通过命令行启用该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
pom
spring-boot-starter-parent
UTF-8
lombok
spring-boot-starter-test
test
spring-cloud-starter-stream-rabbit
spring-boot-starter-actuator
junit
test
spring-cloud-dependencies
import
配置文件
spring.application.name=stream-delayed-message
server.port=18081
spring.cloud.stream.bindings.example-topic-input.destination = delay-topic
spring.cloud.stream.bindings.example-topic-input.group=test
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true
spring.cloud.stream.bindings.example-topic-output.destination=delay-topic
spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true
用来开启延迟消息的功能,这样在创建exchange的时候,会将其设置为具有延迟特性的exchange,也就是用到上面我们安装的延迟消息插件的功能
interface Topic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
@Slf4j
@RestController
public class Controller {
@Autowired
private Topic topic;
/**
* 消息生产接口
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
log.info("Send: "+message);
topic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay",5000).build());
return "ok";
}
}
/**
* 消息消费逻辑
*/
@Slf4j
@Component
publiclass TestListener {
@StreamListener(Topic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
}
}
跑起来 http://localhost:18081/sendMessage?message=hello
【SpringCloud|Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)】
文章图片
推荐阅读
- java|mybatis-plus及常用CRUD方法
- Java|JAVAWEB框架知识点总结
- mysql|搭建SSM框架并实现增删查改功能
- Java|SpringBoot入门项目-基于JPA的App日记后台系统之数据库的创建与JPA的CRUD(二)
- 源码|第13期在线音乐网站前后分离springboot整合vue
- 源码|第11期学生住宿管理系统+程序设计文档
- 框架|用eclipse开发工具,maven进行包管理,整合SSH框架
- JAVA|spring事务失效的原因
- Spring|BeanFactory和ApplicationContext有什么区别