Kafka消费者生产者实例
为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。Kafka是一个分布式流处理平台,具体来说有三层含义:
- 它允许发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 它可以容错的方式存储记录流。
- 它可以处理记录发生时的流。
安装Kafka 从官网下载
kafka_2.11-0.11.0.0.tgz
,解压后安装到指定目录:cd kafka_2.11-0.11.0.0
tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall
启动Kafka:
bin/kafka-server-start.sh config/server.properties
基于Console 创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Producer发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在控制台输入要发送的消息:
This is a message
This is another message
Consumer接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
输入命令后可以看到控制台输出了刚才的消息:
This is a message
This is another message
基于Application 单个consumer
生产者:
public class SimpleKafkaProducer {public static void main(String[] args) {Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "localhost:9092");
//请求时候需要验证
props.put("acks", "all");
//请求失败时候需要重试
props.put("retries", 0);
//内存缓存区大小
props.put("buffer.memory", 33554432);
//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
for (int i = 0;
i < 10;
i++)
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
消费者:
public class SimpleKafkaConsumer {public static void main(String[] args) {Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//每个消费者分配独立的组号
props.put("group.id", "test");
//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
System.out.println("Subscribed to topic " + "test");
int i = 0;
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s/n",
record.offset(), record.key(), record.value());
}
}
}
先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下:
集群消费
以上的程序只是单生产者单消费者的场景,所谓集群消费就是同一个topic的消费可能有多个消费者消费,也称广播消费。集群消费只一种多线程或者多机器的消费方式。
要实现集群消费只需要为每个消费者指定不同的
group.id
就可以。由于代码比较简单就不贴了。【Kafka消费者生产者实例】测试发现,当为了两个consumer(这里是两个进程)指定不同的
group.id
后,producer发送的消息两个consumer都能接受到,这很显然,集群消费嘛。为设置两个consumer的group.id
为同一个的时候,只有一个消费者能消费者到。也就是说,kafka的消息只能由组中的单个用户读取。推荐阅读
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 15.Kafka
- Springboot整合kafka的示例代码
- 搭建大数据三节点(Hadoop、Hbase、Zookeeper、Kafka、Hive)环境(含配置文件参考)
- 曼昆经济学第二十一章|曼昆经济学第二十一章 消费者选择理论
- kafka集群维护
- 十年开发大佬整理的(六大Redis+Nginx+kafka+MySQL+JVM实战文档)
- 用 logstash 从 kafka 读取数据写入 Elasticsearch(qbit)
- 关于kafka数据丢失场景的一次激烈讨论....
- Pulsar|Pulsar vs Kafka(一文掌握高性能消息组件Pulsar基础知识)