本文概述
- 无Key生产者
- 带Key生产者
为了执行回调,用户需要实现回调函数。实现此功能是为了异步处理请求完成。这就是为什么它的返回类型将为空。此功能将在生产者将数据发送到Kafka的块中实现。无需更改其他代码块。
生产者使用的回调函数是onCompletion()。基本上,此方法需要两个参数:
记录的元数据:记录的元数据意味着获取有关分区及其偏移量的信息。如果不为null,则将引发错误。
【kafka生产者回调(callback)】异常:处理过程中可能引发以下异常:
1)可重试的异常:此异常表示消息可能已发送。
2)不可检索的异常:此异常引发错误,消息将永远不会发送。
让我们在下面的快照中查看Producer回调的实现:
first_producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
Logger logger=LoggerFactory.getLogger(producer1call.class);
if (e== null) {
logger.info("Successfully received the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
}else {
logger.error("Can't produce, getting error", e);
}
}
});
已创建“记录器”对象,该对象允许导入“ slf4j.Logger”和“ slf4j.LoggerFactory”。该记录器对象将记录有关分区,偏移量和时间戳的信息。如果异常值等于null,则记录器将显示该信息,否则将显示错误。执行上述代码后,用户将知道发送消息的主题名称,分区号,时间戳,偏移值。
输出快照如下所示:
文章图片
在上面的输出中,可以看到消息生成为“ my_first”,存储在具有“偏移值9”的“分区0”中。
注意:到目前为止,我们发送的消息没有密钥,因此,没有密钥的消息将存储在随机分区中并异步运行。带Key生产者当用户想要将消息发送到同一分区时,键变得很有用。为了发送数据,用户需要指定一个密钥。该密钥将从其他分区中唯一标识该分区。用户需要将同步消息发送到Kafka。
下面显示了一种实现密钥的方法:
文章图片
文章图片
在上面的快照中,我们指定了主题名称,其值和键。创建ProducerRecord时,将其中三个作为参数传递。如果异常“ e”等于null,则记录器将获取有关密钥的信息。最后,将数据发送到Kafka时,将使用get()函数。此方法同步并强制发送数据。用户可以尝试自己的方式来实现密钥。
注意:使用get(),将显示红色下划线。按alt Enter,它会说“向方法签名添加例外”,然后选择它。如上所示,这将向main()添加两个异常。同样,它将“ java.util.concurrent.ExecutionException”导入代码。执行上述代码后,输出显示为:
文章图片
输出中突出显示的部分告诉你键值,主题名称,分区号,偏移值以及时间戳。消息“ OneTwo”现在将始终转到指定的分区。
因此,通过这种方式,生产者可以使用和不使用密钥将数据发送到Kafka。
推荐阅读
- 在java中创建kafka生产者(producer)
- 在intellij idea中安装kafka
- kafka控制台消费者(consumer)
- kafka消费者组cli
- kafka和java编程
- 发送数据到kafka主题(topics)
- 创建kafka主题(topics)
- 在linux上安装kafka
- 在macos上安装apache kafka