本文概述
- 创建记录器
- 创建消费者属性
- 创造消费者
- 订阅消费者
- 轮询新数据
- 读取消费者组中的数据
要创建使用者,需要执行以下步骤:
- 创建记录器
- 创建消费者属性。
- 创建消费者。
- 为消费者订阅特定主题。
- 轮询一些新数据
创建记录器记录器被实现为在程序执行期间写入日志消息。用户需要创建一个Logger对象,该对象将需要导入“ org.slf4j类”。下面的快照显示了Logger的实现:
文章图片
创建消费者属性与生产者属性类似,Apache Kafka还提供了用于创建消费者的各种不同属性。要了解每个消费者的财产,请访问Apache Kafa的官方网站> 文档> 配置> 消费者配置。在这里,我们将列出使用者的必需属性,例如:
key.deserializer:这是密钥的反序列化器类,用于实现“ org.apache.kafka.common.serialization.Deserializer”接口。
value.deserializer:实现值的反序列化器类,它实现“ org.apache.kafka.common.serialization.Desrializer”接口。
bootstrap.servers:这是主机/端口对的列表,用于建立与Kafka集群的初始连接。它不包含客户端所需的全套服务器。仅需要引导所需的服务器。
group.id:这是一个唯一字符串,用于标识使用者组的使用者。当消费者通过订阅主题使用基于Kafka的偏移管理策略或组管理功能时,需要此属性。
auto.offset.reset:当不存在初始偏移量或服务器上不再存在当前偏移量时,此属性是必需的。有以下值可用于重置偏移值:
最早:此偏移量变量会自动将值重置为其最早的偏移量。
最新:此偏移量变量将偏移值重置为其最新偏移量。
none:如果未找到上一组的先前偏移量,则会向使用方抛出异常。
【在java中创建kafka消费者(consumer)】其他:向消费者抛出异常。
注意:在我们的代码中,我们使用了“最早的”变量将值重置为最早的值。这些是实现使用者所必需的一些基本属性。让我们使用IntelliJ IDEA实施。
步骤1)定义一个新的java类为“ consumer1.java”。
步骤2)在类中描述使用者属性,如以下快照所示:
文章图片
在快照中,描述了所有必要的属性。
创造消费者创建一个KafkaConsumer对象以创建使用者,如下所示:
文章图片
在创建使用者时会传递上述属性。
订阅消费者要从主题中读取消息,我们需要将使用者连接到指定的主题。消费者可以通过各种订阅API进行订阅。在这里,我们使用Arrays.asList()是因为用户可能想订阅一个或多个主题。因此,Arrays.asList()允许订阅方订阅多个主题。
下面的代码显示了使用者订阅的实现:
文章图片
用户需要直接或通过字符串变量指定主题名称以阅读消息。可以有多个主题,也用逗号分隔。
轮询新数据消费者通过轮询方法从Kafka读取数据。
文章图片
poll方法返回从当前分区的偏移量获取的数据。指定持续时间,直到它等待数据,否则将空的ConsumerRecord返回给使用者。同样,记录器将获取记录键,分区,记录偏移量及其值。
下面列出了影响Java使用者的完整代码:
package com.firstgroupapp.aktutorial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class consumer1 {
public static void main(String[] args) {
Logger logger= LoggerFactory.getLogger(consumer1.class.getName());
String bootstrapServers="127.0.0.1:9092";
String grp_id="third_app";
String topic="my_first";
//Creating consumer properties
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grp_id);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//creating consumer
KafkaConsumer<
String, String> consumer= new KafkaConsumer<
String, String>(properties);
//Subscribing
consumer.subscribe(Arrays.asList(topic));
//polling
while(true){
ConsumerRecords<
String, String> records=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<
String, String> record: records){
logger.info("Key: "+ record.key() + ", Value:" +record.value());
logger.info("Partition:" + record.partition()+", Offset:"+record.offset());
}}
}
}
这样,消费者可以通过依次执行每个步骤来阅读消息。
使用者实现的输出可以在下面的快照中看到:
文章图片
键值为空。这是因为我们之前没有指定任何密钥。由于“最早”,从头开始显示所有消息。
读取消费者组中的数据用户可以让一个以上的消费者总共读取数据。这可以通过消费者组来完成。在消费者组中,一个或多个消费者将能够从Kafka中读取数据。如果用户想从头开始阅读消息,请重设group_id或更改group_id。这将重置用户的应用程序,并从头开始显示消息。
推荐阅读
- kafka监控和管理
- kafka生产者回调(callback)
- 在java中创建kafka生产者(producer)
- 在intellij idea中安装kafka
- kafka控制台消费者(consumer)
- kafka消费者组cli
- kafka和java编程
- 发送数据到kafka主题(topics)
- 创建kafka主题(topics)