在java中创建kafka消费者(consumer)

本文概述

  • 创建记录器
  • 创建消费者属性
  • 创造消费者
  • 订阅消费者
  • 轮询新数据
  • 读取消费者组中的数据
在上一节中,我们学习了用Java创建生产者。在本节中,我们将学习在Java中实现Kafka使用者。
要创建使用者,需要执行以下步骤:
  1. 创建记录器
  2. 创建消费者属性。
  3. 创建消费者。
  4. 为消费者订阅特定主题。
  5. 轮询一些新数据
让我们讨论学习Java使用者实现的每个步骤。
创建记录器记录器被实现为在程序执行期间写入日志消息。用户需要创建一个Logger对象,该对象将需要导入“ org.slf4j类”。下面的快照显示了Logger的实现:
在java中创建kafka消费者(consumer)

文章图片
创建消费者属性与生产者属性类似,Apache Kafka还提供了用于创建消费者的各种不同属性。要了解每个消费者的财产,请访问Apache Kafa的官方网站> 文档> 配置> 消费者配置。在这里,我们将列出使用者的必需属性,例如:
key.deserializer:这是密钥的反序列化器类,用于实现“ org.apache.kafka.common.serialization.Deserializer”接口。
value.deserializer:实现值的反序列化器类,它实现“ org.apache.kafka.common.serialization.Desrializer”接口。
bootstrap.servers:这是主机/端口对的列表,用于建立与Kafka集群的初始连接。它不包含客户端所需的全套服务器。仅需要引导所需的服务器。
group.id:这是一个唯一字符串,用于标识使用者组的使用者。当消费者通过订阅主题使用基于Kafka的偏移管理策略或组管理功能时,需要此属性。
auto.offset.reset:当不存在初始偏移量或服务器上不再存在当前偏移量时,此属性是必需的。有以下值可用于重置偏移值:
最早:此偏移量变量会自动将值重置为其最早的偏移量。
最新:此偏移量变量将偏移值重置为其最新偏移量。
none:如果未找到上一组的先前偏移量,则会向使用方抛出异常。
【在java中创建kafka消费者(consumer)】其他:向消费者抛出异常。
注意:在我们的代码中,我们使用了“最早的”变量将值重置为最早的值。这些是实现使用者所必需的一些基本属性。让我们使用IntelliJ IDEA实施。
步骤1)定义一个新的java类为“ consumer1.java”。
步骤2)在类中描述使用者属性,如以下快照所示:
在java中创建kafka消费者(consumer)

文章图片
在快照中,描述了所有必要的属性。
创造消费者创建一个KafkaConsumer对象以创建使用者,如下所示:
在java中创建kafka消费者(consumer)

文章图片
在创建使用者时会传递上述属性。
订阅消费者要从主题中读取消息,我们需要将使用者连接到指定的主题。消费者可以通过各种订阅API进行订阅。在这里,我们使用Arrays.asList()是因为用户可能想订阅一个或多个主题。因此,Arrays.asList()允许订阅方订阅多个主题。
下面的代码显示了使用者订阅的实现:
在java中创建kafka消费者(consumer)

文章图片
用户需要直接或通过字符串变量指定主题名称以阅读消息。可以有多个主题,也用逗号分隔。
轮询新数据消费者通过轮询方法从Kafka读取数据。
在java中创建kafka消费者(consumer)

文章图片
poll方法返回从当前分区的偏移量获取的数据。指定持续时间,直到它等待数据,否则将空的ConsumerRecord返回给使用者。同样,记录器将获取记录键,分区,记录偏移量及其值。
下面列出了影响Java使用者的完整代码:
package com.firstgroupapp.aktutorial; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class consumer1 { public static void main(String[] args) { Logger logger= LoggerFactory.getLogger(consumer1.class.getName()); String bootstrapServers="127.0.0.1:9092"; String grp_id="third_app"; String topic="my_first"; //Creating consumer properties Properties properties=new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grp_id); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //creating consumer KafkaConsumer< String, String> consumer= new KafkaConsumer< String, String>(properties); //Subscribing consumer.subscribe(Arrays.asList(topic)); //polling while(true){ ConsumerRecords< String, String> records=consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord< String, String> record: records){ logger.info("Key: "+ record.key() + ", Value:" +record.value()); logger.info("Partition:" + record.partition()+", Offset:"+record.offset()); }} } }

这样,消费者可以通过依次执行每个步骤来阅读消息。
使用者实现的输出可以在下面的快照中看到:
在java中创建kafka消费者(consumer)

文章图片
键值为空。这是因为我们之前没有指定任何密钥。由于“最早”,从头开始显示所有消息。
读取消费者组中的数据用户可以让一个以上的消费者总共读取数据。这可以通过消费者组来完成。在消费者组中,一个或多个消费者将能够从Kafka中读取数据。如果用户想从头开始阅读消息,请重设group_id或更改group_id。这将重置用户的应用程序,并从头开始显示消息。

    推荐阅读