springboot整合kafka
kafka是什么?
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
应用场景
- 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
- 流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
1. 确保有一台虚拟机,并且虚拟机有zookeeper注册中心。如未安装,可参考 2. 下载kafka 本文安装kafka是基于linux操作系统,windows系统,可参考资料自行操作。
官网下载地址:http://kafka.apache.org/downl...
文章图片
3. 解压 【springboot整合kafka】下载完成之后,会发现有一个tar文件,执行命令,进行解压。
tar -zxvf kafka_2.12-2.1.0.tgz
4. 解压完成后,进入解压之后的kafka,使用编辑器修改config/server.properties文件
vim server.properties
5. 修改监听ip,改成自己虚拟机的ip地址,端口默认未9092
文章图片
6. 将zookeeper注册中心的ip地址换成自己zookeeper的ip地址,我的zookeeper是集群,如果单台的话就写一个ip+端口即可。
文章图片
7. 确保zookeeper已经启动之后,启动kafka,在kafka的目录下执行命令。
bin/kafka-server-start.sh config/server.properties &
前面是执行kafka的启动脚本,后面指定刚刚修改的配置文件。
文章图片
出现以上界面后,就说明启动成功了,可以通过jps命令查看进程。
文章图片
8. 创建主题(topic),在kafka目录下执行命令创建主题。
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
命令前是指定创建主题的脚本,中是指定zookeeper的ip地址,后是主题的分区和名称。
执行命令后出现Created topic "test"描述则说明创建成功了。
文章图片
9. 创建完成之后,在kafka目录下执行以下命令,查看所有的主题列表。
./bin/kafka-topics.sh --list --zookeeper localhost:2181
文章图片
如果看到以上界面中,刚刚创建的主题名,则说明已经创建成功了。
10. 查看topic详细信息
./bin/kafka-topics.sh --zookeeper 192.168.126.128:2181,192.168.126.128:2182,192.168.126.128:2183 --describe --topic test
这里我是zookeeper集群,如果单台的话,就写一个ip就可。
11. 创建生产者
./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test
如果出现以下界面,并且输入之后回车,可以继续输入的操作,那说明生产者已经创建完成了,接下来就可以创建消费者,去消费消息。
文章图片
12. 创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yzr --consumer-property group.id=group_test
13.测试生产和消费者 将刚刚的虚拟机端口复制一个,一个ip打开两个终端进行生产和消费测试。
先将生产者启动,消费者随后启动。
可以看到下图,说明消费者成功消费了生产者的消息。
文章图片
文章图片
如果测试中出现了问题,首先要弄清楚:
- zookeeper是否已经成功启动?
- kafka中的配置文件是不是指定了zookeeper的ip?
- kafka启动是不是指定了配置文件?
- 命令中的目录和ip是否正确?
1. 创建Springboot项目 2. pom文件
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
1.2.72
3. yml配置文件
spring:
kafka:
bootstrap-servers: 192.168.126.128:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
bootstrap-servers指定为自己kafka所在的服务器ip
3. 创建生产者
@Component
@Slf4j
public class KafkaProducer {@Resource
private KafkaTemplate kafkaTemplate;
// 这里定义的主题常量,在消费者类中会用到
public static final String TOPIC = "yzr";
/**
* kafka发送消息
*
* @param obj 接收参数
*/
public void send(Object obj) {
String jsonString = JSONObject.toJSONString(obj);
log.info("准备发送消息为:{}", jsonString);
// 发送消息
ListenableFuture> future = kafkaTemplate.send(TOPIC, obj);
// 回调
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(@NotNull Throwable throwable) {
// 发送失败的处理
log.info("{}-生产者发送消息失败:{}", TOPIC, throwable.getMessage());
}@Override
public void onSuccess(SendResult stringObjectSendResult) {
// 发送成功的处理
log.info("{}-生产者发送消息成功:{}", TOPIC, stringObjectSendResult.toString());
}
});
}
}
4. 创建消费者
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "yzr", groupId = "A")
public void topicTest(ConsumerRecord, ?> record, Acknowledgment ack) {
Optional> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:"+ ",Message:" + msg);
ack.acknowledge();
}
}@KafkaListener(topics = "yzr", groupId = "B")
public void topicTest2(ConsumerRecord, ?> record, Acknowledgment ack) {
Optional> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test2 消费了: Topic:"+ ",Message:" + msg);
ack.acknowledge();
}
}
}
5. 创建控制层,发送消息。
@RestController
public class KafkaController {private final KafkaProducer kafkaProducer;
@Autowired
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}/**
* 发送消息
*
* @param message 消息
*/
@GetMapping("/kafka/{message}")
public void sendMessage(@PathVariable("message") String message) {
// 发送消息
kafkaProducer.send(message);
}
}
学习之路,永无止境。
推荐阅读
- Activiti(一)SpringBoot2集成Activiti6
- SpringBoot调用公共模块的自定义注解失效的解决
- 解决SpringBoot引用别的模块无法注入的问题
- 深入浅出谈一下有关分布式消息技术(Kafka)
- Spring|Spring Boot 整合 Activiti6.0.0
- springboot使用redis缓存
- 15.Kafka
- springboot整合数据库连接池-->druid
- SpringBoot中YAML语法及几个注意点说明
- springboot结合redis实现搜索栏热搜功能及文字过滤