世事洞明皆学问,人情练达即文章。这篇文章主要讲述通俗易懂,一篇文章带你认识Kafka相关的知识,希望能为你提供帮助。
本文章转自:乐字节
文章主要讲解:Kafka
获取更多Java相关资料可以关注公众号《乐字节》 发送:999
?
异步通信原理观察者模式
生产者消费者模式
缓冲区
数据单元
消息系统原理一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
点对点消息传递
发布订阅消息传递
Kafka简介
设计目标
Kafka的优点
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
Kafka系统架构
Broker
Topic
Partition
1.controller在ZooKeeper的/brokers/topics节点上注册watcher,当topic被创建,则controller会通过watch得到该topic的partition/replica分配。
2.controller从/brokers/ids读取当前所有可用的broker列表,对于set_p中的每一个partition:
2.1从分配给该partition的所有replica(称为AR)中任选一个可用的broker作为新的leader,并将AR设置为新的ISR
2.2将新的leader和ISR写入/brokers/topics/[topic]/partitions/[partition]/state
3.controller通过RPC向相关的broker发送LeaderAndISRRequest。
1.controller在zooKeeper的/brokers/topics节点上注册watcher,当topic被删除,则controller会通过watch得到该topic的partition/replica分配。
2.若delete.topic.enable=false,结束;否则controller注册在/admin/delete_topics上的watch被fire,controller通过回调向对应的broker发送StopReplicaRequest。
Leader
## Partation数据路由规则
1. 指定了 patition,则直接使用;
2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。
Follower
1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
replication
producer
## Kafka 分配 Replica 的算法如下
1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
consumer
Consumer Group
1. The high-level Consumer API
2. The SimpleConsumer API
?
offset偏移量
Zookeeper
?
?
?
Kafka环境搭建
?
## vim server.properties
20 broker.id=0
25 port=9092
58 log.dirs=/var/bdp/kafka-logs
118 zookeeper.connect=node01:2181,node02:2181,node03:2181
## vim /etc/profile
export KAFKA_HOME=/opt/lzj/kafka_2.11-0.8.2.1
export PATH=$KAFKA_HOME/bin:$PATH
## 配置文件生效
# source /etc/profile
[1]scp -r kafka_2.11-0.8.2.1 root@node02:`pwd`
[1]scp -r kafka_2.11-0.8.2.1 root@node03:`pwd`
[1]scp /etc/profile root@node02:/etc/profile
[1]scp /etc/profile root@node03:/etc/profile
[123] source /etc/profile
## vim server.properties
[2]broker.id=1
[3]broker.id=2
//创建主题
kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic userlog
kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 6 --topic studentlog
kafka-topics.sh --zookeeper node01:2181 --delete --replication-factor 2 --partitions 6 --topic baidu
//查看所有主题
kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
//查看主题
kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic userlog
//创建生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
//创建消费者
kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic userlog
Kafka数据检索机制?
数据的安全性producer delivery guarantee
第一个segment
00000000000000000000.index
00000000000000000000.log
第二个segment,文件命名以第一个segment的最后一条消息的offset组成
00000000000000170410.index
00000000000000170410.log
第三个segment,文件命名以上一个segment的最后一条消息的offset组成
00000000000000239430.index
00000000000000239430.log
ISR机制0. At least one 消息绝不会丢,但可能会重复传输
1. At most once 消息可能会丢,但绝不会重复传输
2. Exactly once 每条消息肯定会被传输一次且仅传输一次
Broker数据存储机制
consumer delivery guarantee
1. 基于时间:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824
数据的消费
JavaAPI生产者
消费者System.out.println("Hello01Producer.run--开始发送数据");
//迭代發送消息
while (count <
100000) {
String key = String.valueOf(++count);
String value = https://www.songbingjia.com/android/Thread.currentThread().getName() +"--" + count;
//封装消息对象
KeyedMessage<
String, String>
message = new KeyedMessage<
>
("userlog", key, value);
//发送消息到服务器
producer.send(message);
//打印消息
System.out.println("Producer.run--" + key + "--" + value);
//每个1秒发送1条
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {Hello01Producer producer = new Hello01Producer("上海尚学堂");
producer.start();
}
}
//创建消费者对象
private ConsumerConnector consumer;
/**
* 创建构造器
*/
public Hello01Consumer(String cname) {
super.setName(cname);
//读取配置文件
Properties properties = new Properties();
//ZK地址
properties.put("zookeeper.connect", "192.168.58.161:2181,192.168.58.162:2181,192.168.58.163:2181");
//消费者所在组的名称
properties.put("group.id", "shsxt-bigdata");
//ZK超时时间
properties.put("zookeeper.session.timeout.ms", "400");
//当消费者第一次消费时,从最低的偏移量开始消费
properties.put("auto.offset.reset", "smallest");
//自动提交偏移量
properties.put("auto.commit.enable", "true");
//消费者自动提交偏移量的时间间隔
properties.put("auto.commit.interval.ms", "1000");
//创建消费者对象
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run() {
// 描述读取哪个topic,需要几个线程读
Map<
String, Integer>
topicCountMap = new HashMap<
String, Integer>
();
topicCountMap.put("userlog", 1);
//消费者给句配置信息开始读取消息流
Map<
String, List<
KafkaStream<
byte[], byte[]>
>
>
consumerMap = consumer.createMessageStreams(topicCountMap);
// 每个线程对应于一个KafkaStream
List<
KafkaStream<
byte[], byte[]>
>
list = consumerMap.get("userlog");
// 获取kafkastream流
KafkaStream stream0 = list.get(0);
ConsumerIterator<
byte[], byte[]>
it = stream0.iterator();
//开始迭代并获取数据
while (it.hasNext()) {
// 获取一条消息
MessageAndMetadata<
byte[], byte[]>
value = https://www.songbingjia.com/android/it.next();
int partition = value.partition();
long offset = value.offset();
String data = https://www.songbingjia.com/android/new String(value.message());
System.out.println("开始" + data + " partition:" + partition + " offset:" + offset);
}
}
public static void main(String[] args) {
Hello01Consumer consumer01 = new Hello01Consumer("李毅");
consumer01.start();
}
重复消费和数据的丢失
Kafka优化Partition 数目
//消费者自动提交偏移量的时间间隔props.put("auto.commit.interval.ms", "1010");
提交间隔》单条执行时间(重复)
提交间隔《单条执行时间(丢失)
Replication factor
批量写入
推荐阅读
- 配置IPv6地址跳变——网络测试仪实操
- #yyds干货盘点#看动画学算法之:二叉搜索树BST
- #yyds干货盘点# 自写dede/织梦的function方法,来满足调用问题
- redis | 十一redis之Bitmaps
- TCP的慢启动拥塞避免重传快恢复乱七八糟总是记不清(11个连环问让你一次性打通任督二脉)
- #yyds干货盘点#CSS实现随机不规则圆角头像
- 在WordPress主题的Javascript函数中添加图片的问题
- 上一页和下一页排除显示的页面数量
- 阻止/阻塞直接访问”谢谢”页面