a distributed streaming platform.1. Kafka简介
??Kafka是一种
消息队列
,主要用来处理大量数据状态
下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka
也就拥有消息队列的相应的特性了。消息队列的好处:
?? 解藕
?? 异步处理
?? 削峰平谷
2、Kafka的消费模式
??Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送一个接收。第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。
**一对一:**消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即
一条消息只能被一个消费者消费
。一对多:这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此Topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
3、 Kafka的基础架构
??Kafka像其他Mq一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper。
?? Producer:消息生产者,向Kafka中发布消息的角色。
?? Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
?? Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费
?? Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
?? Topic:主题,可以理解为一个队列,生产者和消费者都是面向一个Topic
?? Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
?? Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
?? Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
?? Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
??上述一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader保持同步,消费的话也是在Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。
文章图片
4、利用Docker容器快速构建Kafka环境
kafka需要zookeeper管理,所以需要先安装zookeeper?? 安装zookeeper和kafka
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
?? 启动zookeeper
# 端口映射到本地的2181
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
?? 启动kafka
# 本机IP使用ifconfig命令查看
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=本机ip:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://本机ip:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka# 启动示例
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=10.31.154.242:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.31.154.242:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
?? 消息测试
kafka自带了终端工具,在终端测试
# 进入容器
docker exec -it {container-id} bash# 进入kafka安装目录
cd cd opt/kafka_2.13-2.8.1/bin/# 创建一个叫test的topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic "test"# 输入一个消息后确认,然后运行consumer查看是否有消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "test" --from-beginning# 终端输出刚才输入的内容即为构建成功
5、在Python中使用kafka
参考文档:Kafka-Python Api?? 构建producer
【python|Kafka】每一秒向Topic发一条消息
# 由于Kafka的生产者和消费者都需要Topic,所以这边写了一个config
# 内容为:SERVER = '127.0.0.1:9092'TOPIC = 'test'# producer.pyimport json
import time
import datetime
import config
from kafka import KafkaProducerproducer = KafkaProducer(
bootstrap_servers=config.SERVER,
value_serializer=lambda m: json.dumps(m).encode()
)for i in range(100):
data = https://www.it610.com/article/{'num': i + 1, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "msg": "成功"}
producer.send(config.TOPIC, data)
time.sleep(1)
?? 构建consumer
# consumer.pyimport jsonfrom kafka import KafkaConsumer
import config# 第一个参数为topic的名称
# bootstrap_servers: 指定kafka服务器
# group_id : 指定此消费者实例属于的组名,可以不指定
# auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。
# - earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# - latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
consumer = KafkaConsumer(
config.TOPIC,
bootstrap_servers=config.SERVER,
api_version=(0, 11, 5),
group_id="ichpan",
auto_offset_reset='earliest'
)for msg in consumer:
print(json.loads(msg.value))
?? 运行之后producer.py,Script会执行你的任务(发送消息),运行consumer.py可以获取到消息!!!(如图)
文章图片
6、总结
??以上就是一个完整运行Kafka消息队列的全过程,原理和redis发布者订阅者模式差不多,可以理解就是基于Channel的,在使用中我们可以查看容器的log来排除错误信息,也是非常方便的。
推荐阅读
- 面试|Rocketmq持久化
- flink|实践数据湖iceberg 第三十二课 DDL语句通过hive catalog持久化方法
- 【面试普通人VS高手系列】Dubbo是如何动态感知服务下线的()
- C语言与C++编程|马斯克(我是 Rust 粉丝,但为了性能会选择 C语言)
- 游戏|这就是传说中的天才程序员吧! | 每日趣闻
- 10个超级实用的Python自动化脚本
- 7个超好用的Python开发工具!
- Python中最常见的10种排序算法!
- python|Python学习笔记(1)---B站黑马程序员