kafka2.0系列之(6) Kafka客户端API实现数据生产消费
前言 发布和订阅是kafka的重要功能之一,而实现这一功能最好的方式可能就是使用kafka提供的客户端进行编码开发,下面使用kafka官方文档进行示例程序实现数据生成消费,Kafka服务使用之间搭建的环境。
客户端依赖
- 打开文档页面,API文档
- 将生产者API,消费者API依赖加入maven项目
pom.xml
文件
org.apache.kafka
kafka-clients
2.2.0
Producer API Consumer API AdminClient API 这个三个API封装在依赖Producer程序示例kafka-clients
中
- 生产者属性配置
Properties props = new Properties();
props.put("bootstrap.servers", "10.231.129.20:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
其中,
bootstrap.servers
指定kafka服务地址,acks
指定认为请求完成的标准的策略为最强,涉及到数据备份的可用性,key.serializer
指定消息key的序列化类,value.serializer
指定消息value的序列化类- 实例化生产者对象
Producer producer = new KafkaProducer<>(props);
使用上面的配置信息进行生产者实例化
- 发送数据
for (int i = 0;
i < 100;
i++)
producer.send(new ProducerRecord("foo", Integer.toString(i), Integer.toString(i)));
producer.close();
将要发送的消息和要发送的主题
foo
消息发送对象ProducerRecord
中,然后使用producer.send()
方法发送,循环100次之后关闭生产者Consumer程序示例
- 消费者属性配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.231.129.20:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
其中,
bootstrap.servers
指定kafka服务地址,group.id
指定指定消费者组ID,enable.auto.commit
指定消费者自动提交偏移量,auto.commit.interval.ms
指定自动提交偏移量的周期,key.serializer
指定消息key的序列化类,value.serializer
指定消息value的序列化类- 实例化消费者对象
Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
【kafka2.0系列之(6) Kafka客户端API实现数据生产消费】使用上面的配置信息进行消费者实例化,然后订阅生产者主题
foo
- 接收数据
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s%n", record.offset(), record.key(), record.value());
}
使用
consumer.poll(Duration.ofMillis(100))
方法获取主题为foo
以及bar
的消息接收对象CconsumerRecord
,方法内指定拉取周期。测试 分别正常执行生产者,消费者程序,然后看到消费者控制台打印如下:
offset = 3, key = 0, value = https://www.it610.com/article/0
offset = 4, key = 1, value = 1
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 4
...
offset = 99, key = 96, value = 96
offset = 100, key = 97, value = 97
offset = 101, key = 98, value = 98
offset = 102, key = 99, value = 99
后记 这是来源于官方文档的一个示例,是一个基本程序,后续所有东西会基于该程序。通过客户端的API开发,实现生产者,消费者之间的消息数据流通。kafka还是很牛逼的,像我们就经常用于 商业数据实时大屏,ETL中数据传输等等。
最后:
- 本文示例完整代码Github,代码地址:todo
- 本文录制有操作视频,可以在今日头条上进行观看,视频地址:todo
- 欢迎关注头条号:行走的IT
文章图片
参考文章
- 生产者 java文档
- 消费者 java文档
- Kafka API文档
推荐阅读
- PMSJ寻平面设计师之现代(Hyundai)
- 太平之莲
- 闲杂“细雨”
- 七年之痒之后
- 深入理解Go之generate
- 由浅入深理解AOP
- 期刊|期刊 | 国内核心期刊之(北大核心)
- 生活随笔|好天气下的意外之喜
- 感恩之旅第75天
- python学习之|python学习之 实现QQ自动发送消息