python|Kafka

a distributed streaming platform.
1. Kafka简介
??Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
消息队列的好处:
?? 解藕
?? 异步处理
?? 削峰平谷
2、Kafka的消费模式
??Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送一个接收。第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。
**一对一:**消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费
一对多:这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此Topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
3、 Kafka的基础架构
??Kafka像其他Mq一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper。
?? Producer:消息生产者,向Kafka中发布消息的角色。
?? Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
?? Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费
?? Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
?? Topic:主题,可以理解为一个队列,生产者和消费者都是面向一个Topic
?? Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
?? Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
?? Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
?? Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
??上述一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader保持同步,消费的话也是在Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。
python|Kafka
文章图片

4、利用Docker容器快速构建Kafka环境
kafka需要zookeeper管理,所以需要先安装zookeeper
?? 安装zookeeper和kafka
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

?? 启动zookeeper
# 端口映射到本地的2181 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

?? 启动kafka
# 本机IP使用ifconfig命令查看 docker run -d --name kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=本机ip:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://本机ip:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka# 启动示例 docker run -d --name kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=10.31.154.242:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.31.154.242:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

?? 消息测试
kafka自带了终端工具,在终端测试
# 进入容器 docker exec -it {container-id} bash# 进入kafka安装目录 cd cd opt/kafka_2.13-2.8.1/bin/# 创建一个叫test的topic ./kafka-console-producer.sh --broker-list localhost:9092 --topic "test"# 输入一个消息后确认,然后运行consumer查看是否有消息 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "test" --from-beginning# 终端输出刚才输入的内容即为构建成功

5、在Python中使用kafka
参考文档:Kafka-Python Api
?? 构建producer
【python|Kafka】每一秒向Topic发一条消息
# 由于Kafka的生产者和消费者都需要Topic,所以这边写了一个config # 内容为:SERVER = '127.0.0.1:9092'TOPIC = 'test'# producer.pyimport json import time import datetime import config from kafka import KafkaProducerproducer = KafkaProducer( bootstrap_servers=config.SERVER, value_serializer=lambda m: json.dumps(m).encode() )for i in range(100): data = https://www.it610.com/article/{'num': i + 1, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "msg": "成功"} producer.send(config.TOPIC, data) time.sleep(1)

?? 构建consumer
# consumer.pyimport jsonfrom kafka import KafkaConsumer import config# 第一个参数为topic的名称 # bootstrap_servers: 指定kafka服务器 # group_id : 指定此消费者实例属于的组名,可以不指定 # auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。 # - earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # - latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 consumer = KafkaConsumer( config.TOPIC, bootstrap_servers=config.SERVER, api_version=(0, 11, 5), group_id="ichpan", auto_offset_reset='earliest' )for msg in consumer: print(json.loads(msg.value))

?? 运行之后producer.py,Script会执行你的任务(发送消息),运行consumer.py可以获取到消息!!!(如图)
python|Kafka
文章图片

6、总结
??以上就是一个完整运行Kafka消息队列的全过程,原理和redis发布者订阅者模式差不多,可以理解就是基于Channel的,在使用中我们可以查看容器的log来排除错误信息,也是非常方便的。

    推荐阅读