#|Kafka JavaAPI Consumer内容示例
【#|Kafka JavaAPI Consumer内容示例】基础的Consumer内容示例:
package com.czxy.demo02;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer02 {
public static void main(String[] args){//.properties文件的另一种形式
Properties properties = new Properties();
properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
// 消费者组id为test
properties.put("group.id", "test");
//自动提交offset
properties.put("enable.auto.commit", "true");
//自动提交偏移量的时间间隔
properties.put("auto.commit.interval.ms", "1000");
//从哪儿消费
properties.put("auto.offset.reset", "earliest");
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//会话超时时间
properties.put("session.timeout.ms", "30000");
//反序列化器
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例一个Consumer
KafkaConsumer, String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅 teacher主题
kafkaConsumer.subscribe(Arrays.asList("teacher"));
while (true) {
// jdk queue(Java的JDK 中) .offer()插入、.poll()获取元素
ConsumerRecords, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord, String> record : records) {
//输出获得的消息的偏移量和值(offset 和 value)
System.out.printf("offset = %d, value = https://www.it610.com/article/%s", record.offset(), record.value());
System.out.println();
}
}}
}
需要注意的是:
properties.put("auto.offset.reset", "earliest");
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
推荐阅读
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 15.Kafka
- Springboot整合kafka的示例代码
- 搭建大数据三节点(Hadoop、Hbase、Zookeeper、Kafka、Hive)环境(含配置文件参考)
- kafka集群维护
- 十年开发大佬整理的(六大Redis+Nginx+kafka+MySQL+JVM实战文档)
- 用 logstash 从 kafka 读取数据写入 Elasticsearch(qbit)
- 关于kafka数据丢失场景的一次激烈讨论....
- Pulsar|Pulsar vs Kafka(一文掌握高性能消息组件Pulsar基础知识)
- Kafka的生产集群部署