php在kafka写数据 php kafaka

3、Kafka生产者-向Kafka写入数据发送消息的主要步骤
格式php在kafka写数据:每个消息是一个 ProducerRecord 对象 , 必须指定 所属的 Topic和Value ,还可以指定Partition及Key
1:序列化 ProducerRecord
2:分区: 如指定Partition,不做任何事情php在kafka写数据;否则 , Partitioner 根据key得到Partition。生产者向哪个Partition发送
3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上
4:broker收到消息响应。成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误
有序场景:不建议retries0 。可max.in.flight.requests.per.connection1 , 影响生产者吞吐量,但保证有序ps:同partition消息有序
三个 必选 的属性:
(1) bootstrap.servers,broker地址清单
(2) key.serializer: 实现org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组 。注意: 必须被设置 , 即使没指定key
(3)value.serializer , value序列化成字节数组
同步发送消息
异步发送消息
(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功
0 , 不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道
1,leader partition收到消息,一个即成功
all , 所有partition都收到 , 才成功,leader和follower共同应答
(2)buffer.memory, 生产者内 缓存区域大小
(3)compression.type ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩
(4)retries,重发消息的次数
(5)batch.size,发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小 , 单位byte 。不一定填满才发送
(6)linger.ms , 批次时间,batch被填满或者linger.ms达到上限,就把batch中的消息发送出去
(7)max.in.flight.requests.per.connection,生产者在收到服务器响应之前可以发送的消息个数
创建ProducerRecord时 , 必须 指定序列化器,推荐序列化框架Avro、Thrift、ProtoBuf等
用 Avro 之前,先定义schema(通常用 JSON 写)
(1)创建一个类代表客户 , 作为消息的value
(2)定义schema
(3)生成Avro对象发送到Kafka
ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息2)被写到Topic的哪个partition
keynull,默认partitioner,RoundRobin均衡分布
key不空,hash进行散列 ,不改变partition数量(永远不加) , key和partition映射不变 。
自定义paritioner 需实现Partitioner接口
Kafka数据存储 Kafka中的消息是存储在磁盘上的,一个分区副本对应一个日志(Log) 。为了防止Log过大,Kafka又引入了 日志分段 (LogSegment)的概念,将Log切分为多个LogSegment ,相当于一个 巨型文件被平均分配为多个相对较小的文件 , 这样也便于消息的维护和清理 。事实上,Log和LogSegnient 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储 , 而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以.txnindex ”为后缀的事务索引文件),下图为topic、partition、副本、log和logSegment之间的关系 。
虽然一个log被拆为多个分段 , 但只有最后一个LogSegment(当前活跃的日志分段)才能执行写入操作 , 在此之前所有的LogSegment都不能写入数据 。当满足以下其中任一条件会创建新的LogSegment 。
在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,默认大小为1GB 。当下次索引切分时才会设置为实际大小 。也就是说,之前的segment都是实际大小,活跃segment大小为1G 。
索引的主要目的是提高查找的效率 。
Kafka采用稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项 。而是每当写入一定量(由 broker 端参数 log.index. interval.bytes 指定,默认4KB )的消息时,索引文件会增加一个索引项 。
消息查找过程
间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp ,否则不予追加 。
消息查找过程
Kafka将消息存储在磁盘中 , 为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作 。Kafka提供了两种日志清理策略 。
kafka有专门的任务来周期性删除不符合条件的日志分段文件 , 删除策略主要以下有3种 。
对于有相同key的不同value值 , 只保留最后一个版本 。如果应用只关心key对应的最新value值,则可以开启Kafka的日志压缩功能,Kafka会定期将相同key的消息进行合井 , 只保留最新的value值 。
【大数据技术】kafka简介和底层实现一、Kafka的三大组件:Producer、Server、Consumer
1、Kafka的Producer写入消息
producer采用push(推)模式将消息发布到brokerphp在kafka写数据 , 每条消息php在kafka写数据,都被追加到分区中(顺序写到磁盘php在kafka写数据,比随机写内存效率高) 。
· 分区的作用:方便容量扩展 , 可以多并发读写数据,所以php在kafka写数据我们会指定多个分区进行数据存储 。
· 一般根据 event_key的hash% numPartitions来确定写入哪个分区,如果写入时没有指定key,则轮询写入每个分区;因此导致每个partition中消息是有序的,整体无序 。
每条event数据写入partitionA中,并且只会写入partitionA_leader , 当partitionA_leader写入完成后partitionA_flower节点再去partitionA_leader上异步拉取数据;默认ack为1,表示不会等待partitionA_flowers写入完成;如果设置ack为副本数或ack=-1,则等待副本全部写完,再写入下一条数据 。
2、kafka的broker——保存消息
1、 创建topic,并指定分区和副本数
2、每个分区(partition)有一个leader , 多个follower,pull数据时先寻找leader,只会读leader上的数据,leader和follower不会在一个节点上,leader节点宕机后 , 其中一个follower变成leader
3、 消息数据存在每个分区中,默认配置每条消息保存7天 或 分区达到1GB 后删除数据
3、Kafka的Consumer消费数据:
1、consumer采用pull(拉)模式从broker中读取数据 。
2、如果一个消费者来消费同一个topic下不同分区的数据,会读完一个分区再读下一个分区
生产者(producer)API只有一套;但是消费者(consumer)API有两套(高级API和低级API)
一、高级API:
Zookeeper管理offset(默认从最后一个开始读新数据 , 可以配置从开头读)
kafka server(kafka服务)管理分区、副本
二、低级API:
开发者自己控制offset,想从哪里读就从哪里读
// SimpleConsumer是Kafka用来读数据的类
// 通过send()方法获取元数据找到leader
TopicMetadataResponse metadataResponse = simpleConsumer.send(request);//通过metadataResponse获取topic元数据,在获取topic中每个分区的元数据
【php在kafka写数据 php kafaka】 // fetch 抓取数据
FetchResponse response = simpleConsumer.fetch(fetchRequest);
// 解析抓取到的数据
ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);
二、数据、broker状态 , consumer状态的存储
一、在本地存储原始消息数据:
1、hash取模得分区、kafka中每条消息有一个Key,用来确定 每条数据存储到哪个分区中
2、轮询
3、自定义分区
二、在zookeeper存储kafka的元数据
三、存储consumer的offset数据
每个consumer有一个Key(broker Topic partition)的hash,再取模后 用来确定offset存到哪个系统文件中,Value是partitionMetaData 。
1、使用zookeeper启动,zookeeper来存储offset
消费者 消费消息时,offset(消费到的下标)会保存在consumer本地和zookeeper中(由本地上传到zookeeper中,所以本地会保存offset)
2、使用bootstrap启动,本地存储offset(在本地可以减少两节点交互),zookeeper存储其他数据
三、某Flume对接Kafka案例
thinkphp,kafka,hbase,spark之间的通讯机制怎么来实现Spark 有自己php在kafka写数据的 Kafka connector 用于从Kafka读出读入数据 。
Spark 到 Hbase 很多人就用一个foreach operator来写数据 。
关于php在kafka写数据和php kafaka的介绍到此就结束了 , 不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站 。

    推荐阅读