60-spring 消费kafka, 推送消费到kafka

spring 消费kafka 消费者配置:

@Configuration @EnableKafka @ConditionalOnResource(resources = "/special-run.txt") public class ZdryKafkaConsumerConfig {@Value("${kafka.zdry.consumer.autoStartup}") private Boolean autoStartup; @Value("${kafka.zdry.consumer.servers}") private String servers; @Value("${kafka.zdry.consumer.topic}") private String topic; @Value("${kafka.zdry.consumer.group.id}") private String groupId; @Value("${kafka.zdry.consumer.enable.auto.commit}") private String enableAutoCommit; @Value("${kafka.zdry.consumer.auto.commit.interval.ms}") private String autoCommitIntervalMs; @Value("${kafka.zdry.consumer.session.timeout.ms}") private String sessionTimeoutMs; @Value("${kafka.zdry.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.zdry.consumer.max.poll.records}") private String maxPollRecords; @Value("${kafka.zdry.consumer.concurrency}") private Integer concurrency; /** * 消费者批量工厂 人员轨迹 */ @Bean("zdry_person_track") public KafkaListenerContainerFactory batchFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.setBatchListener(true); factory.setAutoStartup(autoStartup); return factory; }/** * 消费者工厂 */ public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }/** * 消费者配置 */ public Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; }}

消费监听:
@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track") public void consumeToJson(List records){ int count = records.size(); log.info("count={}",count); JSONArray arr = new JSONArray(); for (ConsumerRecord record : records) { JSONObject obj = JSON.parseObject(record.value()); arr.add(obj); }}

推送消费到kafka 生产者kafka配置:
@Configuration @EnableKafka @ConditionalOnResource(resources = "/special-run.txt") public class ZdryKafkaProducerConfig { @Value("${kafka.zdry.consumer.servers}") private String servers; @Bean("zdryProducerTemplate") public KafkaTemplate> createTemplate(){ Map pros = producerProps(); ProducerFactory> pf = new DefaultKafkaProducerFactory>(pros); KafkaTemplate> template = new KafkaTemplate<>(pf); return template; }public Map producerProps() {Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 100); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }}

产生消费并推送到kafka
@Autowired private KafkaTemplate zdryProducerTemplate; @Value("${kafka.zdry.consumer.push.topic}") private String pushTopic;

【60-spring 消费kafka, 推送消费到kafka】方法:
try{ zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn)); }catch(Exception e){ e.printStackTrace(); } zdryProducerTemplate.flush();

yml 配置
kafka: ## 重点人员抓拍轨迹 zdry: consumer: autoStartup: true servers: x.x.x.x:6667 topic: person_track group.id: aa_1 enable.auto.commit: true auto.commit.interval.ms: 100 session.timeout.ms: 10000 auto.offset.reset: earliest max.poll.records: 100 concurrency: 1 push.topic: ai_warn

    推荐阅读