Spring|Spring Cloud Stream简单用法
目录
- 简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者
- 生产者
- 消费者
- Stream其他特性
- 消息发送失败的处理
- 消费者错误处理
简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者
生产者
pom文件中加入依赖
org.springframework.cloud spring-cloud-starter-netflix-eureka-clientcom.alibaba.cloud spring-cloud-starter-stream-rocketmq2.1.0.RELEASE
配置文件中增加关于Spring Cloud Stream binder和bindings的配置
spring:application:name: zhao-cloud-stream-producercloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:output:producer:group: testsync: truebindings:output:destination: stream-test-topiccontent-type: text/plain # 内容格式。这里使用 JSON
其中destination代表生产的数据发送到的topic 然后定义一个channel用于数据发送
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface TestChannel {@Output("output")MessageChannel output(); }
最后构造数据发送的接口
@Controllerpublic class SendMessageController {@Resourceprivate TestChannel testChannel; @ResponseBody@RequestMapping(value = "https://www.it610.com/article/send", method = RequestMethod.GET)public String sendMessage() {String messageId = UUID.randomUUID().toString(); Message message = MessageBuilder.withPayload("this is a test:" + messageId).setHeader(MessageConst.PROPERTY_TAGS, "test").build(); try {testChannel.output().send(message); return messageId + "发送成功"; } catch (Exception e) {return messageId + "发送失败,原因:" + e.getMessage(); }}}
消费者
消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性
spring:application:name: zhao-cloud-stream-consumercloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:input:consumer:tags: testbindings:input:destination: stream-test-topiccontent-type: text/plain # 内容格式。这里使用 JSONgroup: test
另外关于channel的构造也要做同样的修改
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface TestChannel {@Input("input")SubscribableChannel input(); }
最后我在启动类中对收到的消息进行了监听
@StreamListener("input")public void receiveInput(@Payload Message message) throws ValidationException {System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); }
测试结果
文章图片
文章图片
Stream其他特性
消息发送失败的处理
消息发送失败后悔发送到默认的一个“topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开 消费者配置如下
spring:application:name: zhao-cloud-stream-producercloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:output:producer:group: testsync: truebindings:output:destination: stream-test-topiccontent-type: text/plain # 内容格式。这里使用 JSONproducer:errorChannelEnabled: true
在启动类中配置错误消息的Channel信息
@Bean("stream-test-topic.errors")MessageChannel testoutPutErrorChannel(){return new PublishSubscribeChannel(); }
新建异常处理service
import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @Servicepublic class ErrorProducerService {@ServiceActivator(inputChannel = "stream-test-topic.errors")public void receiveProducerError(Message message){System.out.println("receive error msg :"+message); }}
当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketMq断开的场景。可见
文章图片
文章图片
消费者错误处理
首先增加配置为
spring:application:name: zhao-cloud-stream-producercloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:output:producer:group: testsync: truebindings:output:destination: stream-test-topiccontent-type: text/plain # 内容格式。这里使用 JSONproducer:errorChannelEnabled: true
增加相应的模拟异常的操作
@StreamListener("input")public void receiveInput(@Payload Message message) throws ValidationException {//System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); throw new RuntimeException("oops"); }@ServiceActivator(inputChannel = "stream-test-topic.test.errors")public void receiveConsumeError(Message message){System.out.println("receive error msg"+message.getPayload()); }
文章图片
代码地址https://github.com/zhendiao/deme-code/tree/main/zp
【Spring|Spring Cloud Stream简单用法】到此这篇关于Spring Cloud Stream简单用法的文章就介绍到这了,更多相关Spring Cloud Stream使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- 【Java面试】如何理解Spring|【Java面试】如何理解Spring Boot中的Starter()
- Java开发工程师进阶篇-Java8的Stream流使用技巧
- 软件开发中的上游和下游
- 中间件|推荐一个完善的停车管理系统,物联网项目springboot,有源码
- Spring|Spring Security实现退出登录和退出处理器
- SpringBoot定时任务多线程实现示例
- JavaWeb|基于Springboot 和 Mybatis 的后台管理系统
- WGCLOUD能监测android(安卓)设备吗
- SpringBoot 整合 RabbitMQ 实现消息可靠传输
- activiti流程图在线绘制业务申请审批流转跟进催办,springboot集成工作流基础