本文概述
- 创建Java生产者
- 创建生产者属性
- 创建生产者记录
- 发送数据
- 卡夫卡依赖
- 记录依存关系,即SLF4J Logger。
步骤1:构建工具Maven包含一个“ pom.xml”文件。 “ pom.xml”是默认XML文件,其中包含有关GroupID,ArtifactID以及Version值的所有信息。用户需要在“ pom.xml”文件中定义所有必要的项目依赖项。转到“ pom.xml”文件。
步骤2:首先,我们需要定义Kafka依赖关系。创建一个'< dependencies> … < / dependencies> ’ 块,在其中定义所需的依赖项。
步骤3:现在,打开网络浏览器并搜索“ Kafka Maven”,如下所示:
文章图片
单击突出显示的链接,然后选择“ Apache Kafka,Kafka-Clients”存储库。下面的快照中显示了一个示例:
文章图片
步骤4:根据系统上下载的Kafka版本选择存储库版本。例如,在本教程中,我们使用的是“ Apache Kafka 2.3.0”。因此,我们需要存储库版本2.3.0(突出显示的版本)。
文章图片
步骤5:点击存储库版本后,将打开一个新窗口。从那里复制依赖关系代码。
文章图片
由于我们正在使用Maven,因此请复制Maven代码。如果用户使用Gradle,请复制Gradle编写的代码。
步骤6:将复制的代码粘贴到'< dependencies> … < / dependencies> ’ 块中,如下所示:
文章图片
如果版本号显示为红色,则表示用户错过了启用“自动导入”选项的权限。如果是这样,请转到“视图”> “工具Windows”> “ Maven”。 Maven项目窗口将出现在屏幕的右侧。单击出现在此处的“刷新”按钮。这将启用丢失的自动导入Maven项目。如果颜色变为黑色,则表示已下载缺少的依赖项。用户可以继续下一步。
步骤7:现在,打开Web浏览器并搜索“ SL4J Simple”,然后打开以下快照中所示的突出显示的链接:
文章图片
一堆存储库将出现。单击适当的存储库。
文章图片
【在java中创建kafka生产者(producer)】要了解适当的存储库,请查看Maven项目窗口,并在“依赖关系”下查看slf4j版本。
文章图片
单击适当的版本并复制代码,然后将其粘贴在Kafka依赖项下方的“ pom.xml”文件中,如下所示:
文章图片
注意:在代码中添加注释或删除 test 标记行。因为此作用域标签为该依赖项定义了一个有限的作用域,并且我们对所有代码都需要此依赖项,所以不应限制该作用域。 现在,我们已经设置了所有必需的依赖项。让我们尝试“简单的Hello World”示例。
首先,创建一个名为“ com.firstgroupapp.aktutorial”的Java包,并在其下方创建一个Java类。创建Java软件包时,请遵循软件包命名约定。最后,创建“ hello world”程序。
文章图片
执行’ producer1.java’ 文件后,输出成功显示为’ Hello World’ 。这说明IntelliJ IDEA的成功工作。
创建Java生产者基本上,创建Java生产者有四个步骤,如前所述:
- 创建生产者属性
- 创建生产者
- 创建生产者记录
- 发送数据。
在那里,用户可以了解Apache Kafka提供的所有生产者属性。在这里,我们将讨论所需的属性,例如:
- bootstrap.servers:这是端口对的列表,用于建立与Kafka集群的初始连接。用户只能将引导服务器用于建立初始连接。该服务器以host:port,host:port等形式存在。
- key.serializer:它是密钥的一种Serializer类,用于实现“ org.apache.kafka.common.serialization.Serializer”接口。
- value.serializer:这是一种Serializer类,它实现“ org.apache.kafka.common.serialization.Serializer”接口。
文章图片
当我们创建属性时,它将“ java.util.Properties”导入到代码中。
这样,完成了创建生产者属性的第一步。
创建生产者
要创建一个Kafka生产者,我们只需要创建一个KafkaProducer对象。
KafkaProducer的对象可以创建为:
KafkaProducer<
String, String> first_producer = new KafkaProducer<
String, String>(properties);
在这里,“ first_producer”是我们选择的生产者的名称。用户可以据此进行选择。
让我们在下面的快照中看到:
文章图片
创建生产者记录为了将数据发送到Kafka,用户需要创建一个ProducerRecord。这是因为所有生产者都位于生产者记录内。生产者在此处指定主题名称以及要传递给Kafka的消息。
可以通过以下方式创建ProducerRecord:
ProducerRecord<
String, String> record=new ProducerRecord<
String, String>("my_first", "Hye Kafka");
在这里,“记录”是用于创建生产者记录的名称,“ my_first”是主题名称,“ Hye Kafka”是消息。用户可以据此进行选择。
让我们在下面的快照中看到:
文章图片
发送数据现在,用户已准备好将数据发送到Kafka。生产者只需要按以下方式调用ProducerRecord的对象:
first_producer.send(record);
让我们在下面的快照中看到:
文章图片
要了解上述代码的输出,请使用以下命令在CLI上打开“ kafka-console-consumer”:
‘ kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic my_first -group first_app’
生产者产生的数据是异步的。因此,需要两个附加函数,即flush()和close()(如上图所示)。 flush()将强制生成所有数据,而close()将停止生产器。如果不执行这些功能,则数据将永远不会发送到Kafka,并且消费者将无法读取它。
下面将使用者控制台上的代码输出显示为:
文章图片
文章图片
在终端上,用户可以看到各种日志文件。航站楼的最后一行说,卡夫卡生产商已经关闭。因此,该消息将异步显示在使用者控制台上。
推荐阅读
- kafka生产者回调(callback)
- 在intellij idea中安装kafka
- kafka控制台消费者(consumer)
- kafka消费者组cli
- kafka和java编程
- 发送数据到kafka主题(topics)
- 创建kafka主题(topics)
- 在linux上安装kafka
- 在macos上安装apache kafka