SpringBoot|spring boot中使用kafka详解(踩完坑又爬了出来)

一.概述 本文会使用两种方式使用kafka:直接使用kafka-client连接kafka服务;另一种是使用spring-kafka框架来连接kafka。
二.Compatibility(兼容性): 【SpringBoot|spring boot中使用kafka详解(踩完坑又爬了出来)】springboot:2.2.1.RELEASE
gradle:

compile ('org.apache.kafka:kafka-clients:2.4.0') compile ('org.springframework.kafka:spring-kafka:2.3.3.RELEASE')


在使用kafka的时候因为版本兼容性的问题没少踩坑。主要的版本兼容体现在springboot与spring-kafka之间,spring-kafka和kafka-client之间,kafka-client和kafka服务端之间的兼容。
spring官网给出的spring-kafka与kafka-clients的兼容对比参照表是这样的:
SpringBoot|spring boot中使用kafka详解(踩完坑又爬了出来)
文章图片

具体的版本依赖也可以参考mvnrepository。
根据博主的实践感觉kafka服务端只要是在0.10.0.1以上,kafka-clients高版本都是支持的,kafka官网也说了服务端和客户端之间是版本兼容的,但是亲测发现低于0.10.0.1版本的服务端是真的高版本兼容不了。

三.异常现象参考:
1.如果在启动kafka的时候配的ip是连接不上的ip那么在启动或者发送消息到kafka的时候就会出现类似这样的异常:
2020-01-29 17:02:49.034 -DEBUG[] [ad | producer-7] o.a.k.c.NetworkClient: [Producer clientId=producer-7] Initiating connection to node -1

2.kafka-client和服务端不兼容在发送message到kafka的时候就会一直输出如下log,一直到超时:
2020-01-29 15:19:48.006 -DEBUG[] [ad | producer-7] o.a.k.c.NetworkClient: [Producer clientId=producer-7] Disconnecting from node -1 due to request timeout. 2020-01-29 15:19:48.006 -DEBUG[] [ad | producer-7] kClient$DefaultMetadataUpdater : [Producer clientId=producer-7] Give up sending metadata request since no node is available 2020-01-29 15:19:48.056 -DEBUG[] [ad | producer-7] kClient$DefaultMetadataUpdater : [Producer clientId=producer-7] Give up sending metadata request since no node is available 2020-01-29 15:19:48.106 -DEBUG[] [ad | producer-7] kClient$DefaultMetadataUpdater : [Producer clientId=producer-7] Give up sending metadata request since no node is available 2020-01-29 15:19:48.157 -DEBUG[] [ad | producer-7] kClient$DefaultMetadataUpdater : [Producer clientId=producer-7] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request 2020-01-29 15:19:48.157 -DEBUG[] [ad | producer-7] o.a.k.c.NetworkClient: [Producer clientId=producer-7] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1 2020-01-29 15:19:48.158 -DEBUG[] [ad | producer-7] o.a.k.c.n.Selector: [Producer clientId=producer-7] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 2020-01-29 15:19:48.158 -DEBUG[] [ad | producer-7] o.a.k.c.NetworkClient: [Producer clientId=producer-7] Completed connection to node -1. Fetching API versions. 2020-01-29 15:19:48.158 -DEBUG[] [ad | producer-7] o.a.k.c.NetworkClient: [Producer clientId=producer-7] Initiating API versions fetch from node -1.

3.kafka-client和spring-kafka版本不匹配,项目不能成功启动
2020-01-29 17:08:48.853 -DEBUG[] [ad | producer-6] o.a.k.c.p.i.Sender: Starting Kafka producer I/O thread. 2020-01-29 17:08:48.853 - INFO[] [main] .a.k.c.u.AppInfoParser$AppInfo : Kafka version : 0.10.0.1 2020-01-29 17:08:48.853 - INFO[] [main] .a.k.c.u.AppInfoParser$AppInfo : Kafka commitId : a7a17cdec9eaa6c5 2020-01-29 17:08:48.853 -DEBUG[] [main] o.a.k.c.p.KafkaProducer: Kafka producer started 2020-01-29 17:08:48.859 -DEBUG[] [main] pathLoggingApplicationListener : Application started with classpath: unknown

4.springboot和spring-kafka不匹配
java.lang.IllegalStateException: Error processing condition on org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.kafkaTemplate at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60) ~[spring-boot-autoconfigure-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForBeanMethod(ConfigurationClassBeanDefinitionReader.java:184) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:144) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:120) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:337) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:242) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:706) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.1.RELEASE.jar:2.2.1.RELEASE]

四.直接使用kafka-client 本文重点讲Producer(发送消息到kafka)。
1.启动kafka服务,可以参照博主的另一篇https://mp.csdn.net/postedit/103386177,给出本次使用的docker-compose文件
version: '2' services: zookeeper: container_name: zookeeper hostname: zookeeper image: wurstmeister/zookeeper ports: - "2181:2181" kafka: container_name: kafka hostname: kafka image: wurstmeister/kafka:0.10.0.1-1 ports: - "9092:9092" - "9093:9093" links: - zookeeper environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_HOST_NAME: 10.48.24.**(此处使用本地ip) JMX_PORT: 9093 KAFKA_ADVERTISED_PORT: 9092 KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_LOG_RETENTION_HOURS: 1 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 2 KAFKA_DELETE_RETENTION_MS: 1000 volumes: - /var/run/docker.sock:/var/run/docker.sock kafka-manager: container_name: kafka-manager hostname: kafka-manager image: sheepkiller/kafka-manager ports: - "9000:9000" links: - zookeeper - kafka environment: ZK_HOSTS: zookeeper:2181 APPLICATION_SECRET: letmein kafdrop: container_name: kafdrop hostname: kafdrop image: thomsch98/kafdrop ports: - "9010:9010" environment: ZK_HOSTS: zookeeper:2181 LISTEN: 9010


首先要对kafka-client进行配置:
package com.neuralyzer.common.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; import static com.neuralyzer.common.NeuralyzerLog.errorLog; import static com.neuralyzer.common.NeuralyzerLog.serviceLog; public class kafkaProducerTemplate { private static KafkaProducer kafkaProducer; private static Properties producerConfigs() { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); returnprops; }static { Properties props = producerConfigs(); kafkaProducer = new KafkaProducer<>(props); }public static void sendMessage(String topicName, String message) { try { serviceLog.info("Send data to kafka, topic name:{}, message:{}",topicName, message); ProducerRecord msg = new ProducerRecord<>(topicName, message); Future future = kafkaProducer.send(msg); RecordMetadata recordMetadata = https://www.it610.com/article/future.get(); } catch (Exception e) { errorLog.error("kafka send happen error.", e); } } }

配置参数详细情况可参考https://www.cnblogs.com/cxuanBlog/p/11862698.html
五.使用sprng-kafka
因为spring-boot提供了对spring-kafka的自动装配,所以直接在application配置文件中配置就可以,当然也可以手动配置。
kafka: bootstrap-servers: 127.0.0.1:9092 listener: concurrency: 3 template: default-topic: neuralyzer-st-backup producer: retries: 0 batch-size: 10 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1

package com.neuralyzer.common.kafka; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.ProducerListener; import org.springframework.stereotype.Component; import static com.neuralyzer.common.NeuralyzerLog.errorLog; import static com.neuralyzer.common.NeuralyzerLog.serviceLog; @Log4j2 @Component public class KafkaProducer {private final KafkaTemplate kafkaTemplate; @Autowired public KafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }public void sendMessage(String topicName, String message) { serviceLog.info("Send data to kafka, topic name:{}, message:{}",topicName, message); kafkaTemplate.send(topicName, message); kafkaTemplate.setProducerListener(new ProducerListener() { @SneakyThrows @Override public void onError(ProducerRecord producerRecord, Exception exception) { errorLog.error("Send data to kafka happen exception", exception); throw exception; } }); } }


    推荐阅读