kafka介绍 kafka是高效的数据流处理平台。可以理解为数据的写入和读取的“中转站”。
相关概念
- Broker
一个broker对应一个kafka实例,可以分别在多台服务器上各启动一个broker, 也可在一个服务器上启动多个broker。
- 【python|【python】Kafka介绍及confluent-kafka的使用】Topic
消息的主题,一个broker可有多个topic。
- Partition
每个Topic中的信息可以分配在多个Partition中,有利于高效消费和后续的管理和扩展
- Producer
生产者,即数据来源
- Consumer
数据消费者
- Consumer Group
不同的Consumer可以分在相同的group中,在同一个group中,不同的consumer消费同一个Topic的不同partition的信息。这样能保证统一topic的信息不会被重复消费。
因此consumer数量如果能正好等于partition数量,能高效读取数据,但若consumer数量大于partition数量,会有部分consumer没有被利用起来,因为同一个partition不能被多个consumer消费。
- Leader和Replication
对于partition而言,每个partition在不同的broker上都有存储,但会选择某个broker的partition作为Leader, 其余的作为Replication。
producer写入数据时仅会写入leader中,replication会主动从leader中pull数据进行备份,同理consumer读取数据时也是只从leader中读取。当leader所在broker宕机时,kafka会从replication中选取broker作为新的leader,由此保证数据不丢失和高效读取。
- Segment
在一个Parition中会有多个segment,每个segment一般包含这三样东西:index、timeIndex、log。其中index和
timeIndex用于索引,log中存放数据信息。每segment中有会有一条或多条信息。
运用分段和索引的方法检索信息能提高数据查询效率。
- Offset
用于确定每条信息在partition中的位置。
confluent-kafka使用文档
Demo示例和部分代码和解释如下:
import conflunet_kafka as kfkc = kfk.Consumer({
'bootstrap.servers': 'Ip1,Ip2', ## kafka所在ip地址,多个地址用逗号隔开。
'group.id': 'test',
'enable.auto.commit': True, ## 是否自动提交offset
'default.topic.config': {
'auto.offset.reset': 'smallest'
})
c.subscribe(['Topic']) ## 为该consumer分配分区
对其中的一些参数进行解释:
enable.auto.commit:是否自动提交offset,设为True时,每隔一段时间就会提交一次offset。
auto.offset.reset:有smallest和lateset可选, 每次从最小的offset读取或从最新一条数据读取,当该partition中没有记录offset生效,否则会直接读取记录的offset。
因此若想测试时使用该参数,可结合’enable.auto.commit’为False使用,这样的话partition就没有被记录的offset,每次都可从第一条/最后一条读取信息。
while True:
msg = c.poll(1)
if msg is None:
continue
else:
if not msg.error() is None:
print msg.error()
else:
message = msg.value()
print msg.partition(), msg.offset()
若想为该topic的各个分区指定offset,可初始化一个TopicPartition实例,通过commit把该offset提交上去。
topicPartitionList = [
kfk.TopicPartition('Topic', partition=0, offset=100),
kfk.TopicPartition('Topic', partition=1, offset=200),
kfk.TopicPartition('Topic', partition=2, offset=300),
] ## 对该topic的三个分区只配offsetc.commit(offsets=topicPartitionList) ##提交分区offset
c.commit(message=msg) ## 也可通过msg提交分区offsetc.commited(topicPartitionList) ## 查看分区偏移量
推荐阅读
- Python|Python使用turtle库+jieba库完成简易中文词频统计,附代码
- shell curl 与 python requests的一次对比
- gem5|WSL 安装 gem5
- jupyter|Jupyter Notebook the sql module is not an ipython extension
- 测试|自动化测试selenium基础篇——webdriverAPI
- 聊聊 Jmeter 如何并发执行 Python 脚本
- spinning|强化学习入门项目spinning up(1)安装
- python|【Python数据科学快速入门系列 | 01】Numpy初窥——基础概念
- 语义分割|【语义分割项目实战】制作语义分割数据集,并使用U-Net进行实战检测