spring|spring boot与kafka集成(spring boot 1.5.1版本)
随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。
引入依赖
org.springframework.kafka
spring-kafka
具体spring-kafka的版本由spring boot的当前版本决定。
application.properties配置文件
spring.kafka.bootstrap-servers=192.168.1.107:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
最简化的配置仅需指定kafka主机和消息者组名即可。这里使用的是单节点kafka,集群环境中配置多个kafka主机地址即可。例如:
spring.kafka.bootstrap-servers=192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092
【spring|spring boot与kafka集成(spring boot 1.5.1版本)】以下4项配置指定消息key和消息体的编解码方式。
spring.kafka.consumer.key-deserializer
spring.kafka.consumer.value-deserializer
spring.kafka.producer.key-serializer
spring.kafka.producer.value-serializer
消息对象
import java.util.Date;
public class Message {private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
消息生产者
import java.util.Date;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@Component
public class Sender {
@Autowired
private KafkaTemplate kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void sendMessage(){
Message m = new Message();
m.setId(System.currentTimeMillis());
m.setMsg(UUID.randomUUID().toString());
m.setSendTime(new Date());
kafkaTemplate.send("test1", gson.toJson(m));
}
}
消息消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@Component
public class Receiver {private Gson gson = new GsonBuilder().create();
@KafkaListener(topics = "test1")
public void processMessage(String content) {
Message m = gson.fromJson(content, Message.class);
}
}
运行
@SpringBootApplication
public class AppStart{
public static void main(String[] args) throws InterruptedException {ApplicationContext app = SpringApplication.run(AppStart.class, args);
while(true){
Sender sender = app.getBean(Sender.class);
sender.sendMessage();
Thread.sleep(500);
}
}}
通过上面的示例可以发现,相对于spring boot 1.4.x版本,1.5集成kafka主要是将以前需要手工编码进行设置的kafka配置改由spring配置文件定义。 注意 我使用的spring boot版本是1.5.1,spring-kafka版本1.1.2,jdk1.8,该组合似乎不支持低版本的kafka。之前我使用kafka版本为2.11-0.10.0.0,向kafka发送消息时一直产生异常,后来升级kafka版本至2.11-0.10.2.0故障消失。由于测试时间有限,未作进一步分析。希望查明原因的同学能私信我。谢谢。
推荐阅读
- 爱与伤害(现代诗)
- 面试|MySQL 免安装版的下载与配置教程
- Spring系列|SpringBoot实现文件上传/使用实现MultipartFile
- springboot文件上传服务器|springboot文件上传服务器,SpringBoot: 浅谈文件上传和访问的坑 (MultiPartFile)
- ssm中使用kindedit|springboot项目中需要配置文件上传解析器吗_Spring Boot2 系列教程(一)纯 Java 搭建 SSM 项目...
- (JR)SpringBoot|SpringBoot 第一讲(图片上传MultipartFile)
- java项目工具|oss与文件上传组件MultipartFile
- 321电商早报0117丨微软与Walgreens合作
- #|Spring Boot spring.factories 用法及原理
- 技术分享|SpringBoot 底层原理剖析