整合Spring Cloud Stream Binder与Kafka进行消息发送与接收

我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶!
1 简介 【整合Spring Cloud Stream Binder与Kafka进行消息发送与接收】之前讲解了Spring Cloud Stream整合RabbitMQ和GCP Pubsub,都是非常简单,而且代码没什么区别的。本文讲解Spring Cloud Stram与Kafka的整合,同样也是非常简单。
之前的文章:
《整合Spring Cloud Stream Binder与RabbitMQ进行消息发送与接收》
《整合Spring Cloud Stream Binder与GCP Pubsub进行消息发送与接收》
2 安装Kafka 为了演示简单,这里只是安装Standalone的版本,而不是集群。先到官网下载安装包,然后进行解压即可使用。
# 解压安装包 $ tar -xzf kafka_2.13-2.8.0.tgz # 进入目录 $ cd kafka_2.13-2.8.0/ # 启动zookeeper $ $ bin/zookeeper-server-start.sh config/zookeeper.properties # 启动kafka(新的命令行终端) $ bin/kafka-server-start.sh config/server.properties

这里Zookeeper和Kafka都使用了默认配置,所以不用修改了。启动的时候会有日志输出,如果没有报错,说明启动成功了。
3 整合 引入相关依赖:
org.springframework.cloud spring-cloud-stream-binder-kafka

实现简单的Publisher和Consumer:
package com.pkslow.cloud.stream.binder.kafka; @SpringBootApplication public class StreamBinderKafka { private static final Logger log = LoggerFactory.getLogger(StreamBinderKafka.class); public static void main(String[] args) { SpringApplication.run(StreamBinderKafka.class, args); }@Bean public Supplier pkslowSource() { return () -> { String message = "www.pkslow.com"; log.info("Sending value: " + message); return message; }; }@Bean public Consumer pkslowSink() { return message -> { log.info("Received message " + message); }; } }

配置必要的属性如下:
spring: cloud: stream: function: definition: pkslowSource; pkslowSink bindings: pkslowSource-out-0: destination: pkslow-topic pkslowSink-in-0: destination: pkslow-topic poller: fixed-delay: 500 kafka: binder: brokers: localhost:9092 auto-create-topics: true required-acks: 1

运行日志如下:
整合Spring Cloud Stream Binder与Kafka进行消息发送与接收
文章图片

4 总结 三个MQ的整合下来几乎没有什么区别,也没有太大的代码改动,这就是Spring Cloud Stream给我们带来的便利。
代码请查看:https://github.com/LarryDpk/p...
欢迎关注微信公众号<南瓜慢说>,将持续为你更新...
整合Spring Cloud Stream Binder与Kafka进行消息发送与接收
文章图片

多读书,多分享;多写作,多整理。

    推荐阅读