kafka2.0系列之(6) Kafka客户端API实现数据生产消费

前言 发布和订阅是kafka的重要功能之一,而实现这一功能最好的方式可能就是使用kafka提供的客户端进行编码开发,下面使用kafka官方文档进行示例程序实现数据生成消费,Kafka服务使用之间搭建的环境。
客户端依赖

  1. 打开文档页面,API文档
  2. 将生产者API,消费者API依赖加入maven项目pom.xml文件
org.apache.kafka kafka-clients 2.2.0

Producer API Consumer API AdminClient API 这个三个API封装在依赖kafka-clients
Producer程序示例
  1. 生产者属性配置
Properties props = new Properties(); props.put("bootstrap.servers", "10.231.129.20:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

其中,bootstrap.servers指定kafka服务地址,acks指定认为请求完成的标准的策略为最强,涉及到数据备份的可用性,key.serializer指定消息key的序列化类,value.serializer指定消息value的序列化类
  1. 实例化生产者对象
Producer producer = new KafkaProducer<>(props);

使用上面的配置信息进行生产者实例化
  1. 发送数据
for (int i = 0; i < 100; i++) producer.send(new ProducerRecord("foo", Integer.toString(i), Integer.toString(i))); producer.close();

将要发送的消息和要发送的主题foo消息发送对象ProducerRecord中,然后使用producer.send()方法发送,循环100次之后关闭生产者
Consumer程序示例
  1. 消费者属性配置
Properties props = new Properties(); props.setProperty("bootstrap.servers", "10.231.129.20:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

其中,bootstrap.servers指定kafka服务地址,group.id指定指定消费者组ID,enable.auto.commit指定消费者自动提交偏移量,auto.commit.interval.ms指定自动提交偏移量的周期,key.serializer指定消息key的序列化类,value.serializer指定消息value的序列化类
  1. 实例化消费者对象
Consumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar"));

【kafka2.0系列之(6) Kafka客户端API实现数据生产消费】使用上面的配置信息进行消费者实例化,然后订阅生产者主题foo
  1. 接收数据
while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s%n", record.offset(), record.key(), record.value()); }

使用consumer.poll(Duration.ofMillis(100))方法获取主题为foo以及bar的消息接收对象CconsumerRecord,方法内指定拉取周期。
测试 分别正常执行生产者,消费者程序,然后看到消费者控制台打印如下:
offset = 3, key = 0, value = https://www.it610.com/article/0 offset = 4, key = 1, value = 1 offset = 5, key = 2, value = 2 offset = 6, key = 3, value = 4 ... offset = 99, key = 96, value = 96 offset = 100, key = 97, value = 97 offset = 101, key = 98, value = 98 offset = 102, key = 99, value = 99

后记 这是来源于官方文档的一个示例,是一个基本程序,后续所有东西会基于该程序。通过客户端的API开发,实现生产者,消费者之间的消息数据流通。kafka还是很牛逼的,像我们就经常用于 商业数据实时大屏,ETL中数据传输等等。
最后:
  • 本文示例完整代码Github,代码地址:todo
  • 本文录制有操作视频,可以在今日头条上进行观看,视频地址:todo
  • 欢迎关注头条号:行走的IT
kafka2.0系列之(6) Kafka客户端API实现数据生产消费
文章图片
参考文章
  • 生产者 java文档
  • 消费者 java文档
  • Kafka API文档

    推荐阅读