spring 消费kafka
消费者配置:
@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaConsumerConfig {@Value("${kafka.zdry.consumer.autoStartup}")
private Boolean autoStartup;
@Value("${kafka.zdry.consumer.servers}")
private String servers;
@Value("${kafka.zdry.consumer.topic}")
private String topic;
@Value("${kafka.zdry.consumer.group.id}")
private String groupId;
@Value("${kafka.zdry.consumer.enable.auto.commit}")
private String enableAutoCommit;
@Value("${kafka.zdry.consumer.auto.commit.interval.ms}")
private String autoCommitIntervalMs;
@Value("${kafka.zdry.consumer.session.timeout.ms}")
private String sessionTimeoutMs;
@Value("${kafka.zdry.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.zdry.consumer.max.poll.records}")
private String maxPollRecords;
@Value("${kafka.zdry.consumer.concurrency}")
private Integer concurrency;
/**
* 消费者批量工厂 人员轨迹
*/
@Bean("zdry_person_track")
public KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
factory.setAutoStartup(autoStartup);
return factory;
}/**
* 消费者工厂
*/
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}/**
* 消费者配置
*/
public Map consumerConfigs() {
Map propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}}
消费监听:
@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track")
public void consumeToJson(List records){
int count = records.size();
log.info("count={}",count);
JSONArray arr = new JSONArray();
for (ConsumerRecord record : records) {
JSONObject obj = JSON.parseObject(record.value());
arr.add(obj);
}}
推送消费到kafka 生产者kafka配置:
@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaProducerConfig {
@Value("${kafka.zdry.consumer.servers}")
private String servers;
@Bean("zdryProducerTemplate")
public KafkaTemplate> createTemplate(){
Map pros = producerProps();
ProducerFactory> pf = new DefaultKafkaProducerFactory>(pros);
KafkaTemplate> template = new KafkaTemplate<>(pf);
return template;
}public Map producerProps() {Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}}
产生消费并推送到kafka
@Autowired
private KafkaTemplate zdryProducerTemplate;
@Value("${kafka.zdry.consumer.push.topic}")
private String pushTopic;
【60-spring 消费kafka, 推送消费到kafka】方法:
try{
zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));
}catch(Exception e){
e.printStackTrace();
}
zdryProducerTemplate.flush();
yml 配置
kafka:
## 重点人员抓拍轨迹
zdry:
consumer:
autoStartup: true
servers: x.x.x.x:6667
topic: person_track
group.id: aa_1
enable.auto.commit: true
auto.commit.interval.ms: 100
session.timeout.ms: 10000
auto.offset.reset: earliest
max.poll.records: 100
concurrency: 1
push.topic: ai_warn
推荐阅读
- SpringBoot学习笔记|SpringBoot配置文件优先级
- js|jquery 分页兼容i7,i8浏览器
- java|cocos2dx环境搭建
- 运维管理|Docker服务编排
- spring|SpringBoot自动配置原理入门
- Spring实战|解决Spring boot : java.lang.ExceptionInInitializerError报错
- 面试|springboot自动装配原理最终版
- 五|猿创征文 | web前端——她
- spring|猿创征文 |【SpringBoot2】快速上手SpringBoot