Spring|Spring Boot操作Kafka(极简示例)

Spring Boot操作Kafka (1).Maven的配置

org.apache.kafka kafka_2.11 0.10.2.0

(2).添加配置文件: application.properties
# kafka连接地址,多个地址之间使用","隔开 spring.kafka.bootstrap-servers = master:9092 # 生产者配置 spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip # 消费者配置 spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 标识消费者监听的个数 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1

(3).生产者代码
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; @Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { @Resource private KafkaTemplate kafkaTemplate; // 读取配置文件 @Value("${kafka.topic1}") private String topic; @Override public void sendKafka() { kafkaTemplate.send(topic, "hell world"); } }

(4),消费者代码
@Component @Slf4j public class MyKafkaConsumer { @KafkaListener(topics = "${kafka.topic1}") public void listen(ConsumerRecord record) { Optional kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { log.info("----------------- record =" + record); log.info("------------------ message =" + kafkaMessage.get()); }

    推荐阅读