#|Kafka JavaAPI Consumer内容示例

【#|Kafka JavaAPI Consumer内容示例】基础的Consumer内容示例:

package com.czxy.demo02; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer02 { public static void main(String[] args){//.properties文件的另一种形式 Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); // 消费者组id为test properties.put("group.id", "test"); //自动提交offset properties.put("enable.auto.commit", "true"); //自动提交偏移量的时间间隔 properties.put("auto.commit.interval.ms", "1000"); //从哪儿消费 properties.put("auto.offset.reset", "earliest"); //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//会话超时时间 properties.put("session.timeout.ms", "30000"); //反序列化器 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例一个Consumer KafkaConsumer, String> kafkaConsumer = new KafkaConsumer<>(properties); //订阅 teacher主题 kafkaConsumer.subscribe(Arrays.asList("teacher")); while (true) { // jdk queue(Java的JDK 中) .offer()插入、.poll()获取元素 ConsumerRecords, String> records = kafkaConsumer.poll(100); for (ConsumerRecord, String> record : records) { //输出获得的消息的偏移量和值(offset 和 value) System.out.printf("offset = %d, value = https://www.it610.com/article/%s", record.offset(), record.value()); System.out.println(); } }} }

需要注意的是:properties.put("auto.offset.reset", "earliest");
  • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    推荐阅读