Kafka入门学习笔记

本文是对Kafka的一个学习总结,共包括如下内容:

  • 概述
  • 基本结构
  • 重要概念
  • 快速起步
  • Java API
参考资料:
1、Kafka的运行依赖Zookeeper,要了解Zookeeper的基础知识,可参考文档《Zookeeper学习笔记》。
2、Kafka是用Scala语言编写的,如果需要查看源码或使用其提供的Scala Api,需要先熟悉下Scala语言的基础知识(本文不涉及)。
一、概述 Kafka 起初是由 LinkedIn 公司开发的一个分布式的消息系统,后成为 Apache 的一部分。Kafka当前已经把自己定位成一个分布式的流计算平台,可以支持实时的流计算。它使用 Scala语言编写,以可水平扩展和高吞吐率而被广泛使用。
Kafka在2011年初开源,2012年10月成为了Apache软件基金会的项目。其版本经历了0.x系列,到1.x系列,到最新的2.x系列的发展,一些主要的版本发布时间如下:
2015年11月发布的0.9.0.0
2016年5月发布的0.10.0.0
2017年6月发布的0.11.0.0
2017年11月 发布 1.0.0 版本 (从此版本号由4位变3位)
2018年3月发布1.1.0
2018年7月发布1.1.1
2018年7月发布2.0.0
2018年11月发布2.1.0 (当前最新版本)
kafka是由Scala语言写成,Scala 运行在Java虚拟机上,并兼容现有的Java程序,因此部署kakfa的时候,需要先部署jdk环境。
Kafka的官方网站地址是http://kafka.apache.org/,网站上提供了二进制安装版本的下载。kafka每个版本的二进制包,都有两个版本提供,一个是基于scala2.11版本编译的,一个是基于scala2.12版本编译的。正常情况下,应该使用基于scala2.12版本编译的kafka版本。二进制下载文件名称如下(以kafka 2.1.0版本为例):
kafka_2.11-2.1.0.tgz
kafka_2.12-2.1.0.tgz
中划线-前面的版本2.11/2.12是代表使用的scala的版本号,-后面2.1.0代表kafak的版本。
本文介绍的Kafka,使用的是最新的2.1版本。
二、基本结构 下图是Kafka的基本结构图:
Kafka入门学习笔记
文章图片
从上图可以看出,Kafka有四种核心的API:
1、Producer API:允许应用程序发布记录流到一个或多个Kafka主题。
2、Consumer API:允许应用订阅一个或多个主题,并通过拉取操作获得发布到主题上的记录。
3、Connectors:允许其它系统的数据(如数据库中数据)和Kafka集群中的大量数据快速的相互复制。
4、Streams API:允许应用程序充当流处理器,从一个或多个主题获取输入流,并产生一个输出流至一个或多个输出的主题。
这四种核心Api也代表了Kafka的几大核心功能。
Kafka主要有如下的应用模式:
1、长期以来,Kafka是大数据场景下做高吞吐的发布订阅消息队列的首选。
2、其推出的Kafka Streams功能,也使其成为一个分布式的流计算平台(distributed streaming platform),对于一些简单的流计算应用场景,使用Kafka还是可以的,但对于一些复杂场景,还是使用如Spark,Flink,Storm等更专业的流计算平台更适合。
3、Kafka还是一个非常好的存储系统。写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka可以高效的进行磁盘数据的处理,无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行相同的操作。并且可以允许客户端控制其读取位置,所以可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
三、重要概念 (一)代理

Kafka集群(Cluster)由一个或多个Kafka实例构成,我们将每一个Kafka实例称为代理(Broker),通常也称代理为Kafka服务器(KafkaServer)。在生产环境下,Kafka集群一般包括多台服务器设备,我们可以在一台服务器设备上配置一个或多个代理。每一个代理都有一个唯一的标识ID(非负整数,称为BrokerId),以用来唯一标识一个代理。如下图所示: Kafka入门学习笔记
文章图片
(二)记录****和主题 记录是Kafka通信的基本单位,一个记录都包含一个键,一个值和一个时间戳。我们在程序中发送记录时,记录的值需要给出,记录的键可以给也可以不给,时间戳不需要给,由系统自动生成。
在老版本中,记录被称为消息。
主题是Kafka中非常核心的一个概念,它是记录的分类,也是订阅的对象。生产者是将记录发布到特定的主题,消费者是按主题来订阅。
(三)分区

Kafka中的每个主题被分为一个或多个分区(Partition),每个分区都是一个有序的,不可变的记录序列。如下图所示: Kafka入门学习笔记
文章图片
分区中的记录每个都被分配了一个称为偏移的顺序ID号,它唯一地标识分区中的每个记录。单个分区内的记录是有序的,单个分区就相当于一个先进先出的队列。


消费者在消费时,会保存已消费的记录对应的偏移量,以保证消息被顺序消费,这个偏移量信息会保存在Kafka集群中。注意,这里保存的偏移量只是指消费者当前消费的记录的位置,不改变记录本身的偏移量。当然消费者可以重置到较旧的偏移量来重新处理过去的数据。消费者对应记录的偏移量如下图所示: Kafka入门学习笔记
文章图片
上图显示两个消费者消费偏移量的不同位置。
在server.properties配置文件中通过num.partitions属性可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认设置的值是1。
利用命令行创建主题的时候(在后面会介绍),也可以使用--partitions选项来明确指定要创建主题的分区数量,如果不指定,则会使用server.properties中的设置。
(四)物理存储 Kafka集群持久保存所有已发布的记录,即使记录已经被消费,也不会立即被删除,保存的策略可以配置。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。
分区在物理上对应一个目录,目录名的命名规则是“主题名-分区编号”,分区编号从0开始。发布到该分区的记录被存储到该目录下后缀名为.log的数据文件(称为日志文件)中,会根据数据量的多少,分为多个物理上的文件。
(五)副本 为了提高可用性,每个分区可以配置一个或多个副本(Replica),分区的副本分布在集群的不同代理上。由于副本的存在,就需要保证一个分区的多个副本的数据的一致性。
对于每个分区,Kafka会选择分区的一个副本作为Leader副本,而该分区的其它副本作为Follower副本。只有Leader副本才负责处理客户端的读/写请求,Follower副本从Leader副本同步数据。客户端只需要与Leader副本进行交互,这样数据的一致性就有了保证。
【Kafka入门学习笔记】副本的Leader角色和Follower角色并不是固定不变的,如果Leader副本失效了,Kafka会通过相应的算法从其它的Follower副本中选出新的Leader副本。
(六)生产者 生产者(Producer)将记录发布到它们选择的主题。因为主题下是分区,记录是存储在分区中的,所以事实上生产者发送的记录是到分区中的,其策略是:
1、如果在发消息的时候指定了分区,则消息投递到指定的分区。
2、如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区。
3、如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
(七)消费者和消费者组 消费者(Consumer)以拉取(pull)的方式从Kafka集群中拉取数据。每个消费者都必须属于一个特定的消费组,在创建消费者时,必须标注自己属于哪个消费者组(通过属性group.id来标识),消费者组不需要单独创建,它只是一个名称,只需要指定即可。
如果有多个消费者属于同一个消费组,当它们同时从某个主题中poll记录时,这个主题中的记录只能被一个消费者获取到,至于是哪个,由kafka来自动确定。也就是对于一个主题下的记录,只能被同一个消费组下的一个消费者消费,但不同消费组的消费者可以同时消费该记录。
消费组是Kafka实现对一个主题中的记录进行广播和单播的手段,如果要想广播主题下的记录(即让所有消费者都能取到同样的记录),则只需让这些消费者属于不同的消费群组;如果想要单播(即只让所有消费者中的一个能取到同样的记录),则只需让这些消费者属于同一个消费群组。


下图是一个例子,说明了上面的描述: Kafka入门学习笔记
文章图片
上图的Kafka集群有两个代理(Broker),P0~P3是某个主题的4个分区,被分布在这两个代理上。有6个消费者,分别属于两个消费者组。
(八)Zookeeper Zookeeper是一个开源的分布式协调服务组件,它为分布式应用提供了高效且可靠的分布式协调服务。Kafka的运行依赖Zookeeper。Kafka利用Zookeeper保存相应的元数据信息,如代理节点信息、集群信息、主题信息、分区信息、动态配置信息等。在老的版本中消费者信息及消费偏移量信息也保存在Zookeeper中,在新的版本中,这些信息保存在Kafka代理中。
Kafka在启动或运行过程中会在Zookeeper上创建相应的节点来保存元数据信息,Kafka通过Zookeeper的监听机制在这些节点注册相应的监听器来监听节点元数据的变化,从而做出相应的处理。
四、快速起步 下面我们介绍如何在单机环境下安装、运行及使用Kafka,对于学习目的,使用单机环境是最简单方便的。
1、下载kafka,本文下载的压缩文件是kafka_2.12-2.1.0.tgz,将其
解压到某个目录下。
2、修改配置文件
按照最简化模式,我们需要修改的配置文件是:
1)config目录下的server.properties文件,把其中的 log.dirs的值改成适合的目录,尤其是在windows下运行,一定要改,因为默认值是Linux格式的目录。注意,即使是windows下,目录间要用 / 分隔,而不能用 \。该目录用于存储发送到Kafka中的数据。
2)因为kafka的运行依赖zookeeper,如果我们使用kafka自带的zookeeper程序,需要修改config目录下的zookeeper.properties文件,将dataDir属性设置为适合的目录。
3、启动服务
我们需要启动zookeeper服务和kafka服务。相关的启动脚本都是在安装目录的bin目录(对于linux下的脚本)和bin\windows目录下(对于windows下的脚本)。
1)启动zookeeper服务,在真实的环境下,一般我们使用单独的zookeeper集群服务。在测试情况下,我们可以使用kafka自带的zookeeper服务,启动脚本是zookeeper-server-start.sh(linux下)、zookeeper-server-start.bat(windows下)。如下面方式:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2)启动kafka服务,启动脚本是kafka-server-start.sh(linux下)、kafka-server-start.bat(windows下)。如下面方式:
bin\windows\kafka-server-start.bat config\server.properties

4、测试
kafka提供了多个脚本,位于bin目录下,window系统下的脚本位于bin\windows下。利用这些脚本可以以命令行的方式进行相关的kafka操作。
下面举几个基本的例子:
1)利用脚本创建一个主题
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

上面脚本创建一个叫test的topic,该主题的副本数为1,分区数也为1。
下面脚本可以显示kafka下的所有已创建的topic信息
kafka-topics.bat --list --zookeeper localhost:2181

2)启动一个生产者,往上面创建的test主题上发送记录
kafka-console-producer.bat --broker-list localhost:9092 --topic test

上面脚本运行会出现一个交互式界面,输入信息回车就是发送一条记录到test主题。
3)启动一个消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test--from-beginning

上面脚本的功能是启动一个消费者,从test主题中拉取数据。运行后也会出现一个交互式界面,会立即显示上面生产者命令行程序前面发送的记录。这时我们再在上面的生产者命令行程序发送新的记录,这里也会立即收到。
如果一切正常,说明kafka单机运行环境正常。
五、Java API 除了使用Kafka提供的命令行操作外,更多情况下我们是通过编写程序来使用Kafka。Kafka提供了包含Java语言在内的多种编程语言的API。下面,我们以java api为例来简单介绍下如何编写客户端程序。
(一)生产者 下面看一个生产者的代码例子。先建立Maven工程,需要引入如下依赖:
org.apache.kafka kafka-clients 2.1.0

先给出例子代码,如下:
package com.kafkademo; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { String key = Integer.toString(i); producer.send(new ProducerRecord("test", key, "value" + key)); } producer.close(); } }

生产者代码中最核心的是KafkaProducer类。上面代码首先创建了一个KafkaProducer的实例,构造函数传入一个属性对象,这里只设置了一些最基本的信息,其它采用默认值。然后循环调用其send方法发送一条记录,记录由ProducerRecord类创建,有三个参数,第1个参数是指定发送到哪个主题上,第2个参数是记录的key值,第3个参数是记录的value值。
可直接运行上面程序代码,上述代码的功能是往Kafka集群(这里是本地单机部署)中发送记录,指定的主题名为test。执行程序后,我们运行kafka提供的消费者命令程序就可以获取到这些信息。
(二)消费者 下面看一个消费者的代码例子。先建立Maven工程,需要引入如下依赖:
org.apache.kafka kafka-clients 2.1.0

先给出例子代码,如下:
package com.kafkademo; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "hello"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { Duration timeout = Duration.ofSeconds(1); ConsumerRecords records = consumer.poll(timeout); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s%n", record.offset(), record.key(), record.value()); } } }

消费者代码中最核心的是KafkaConsumer类。上面代码首先创建了一个KafkaConsumer的实例,构造函数传入一个属性对象,这里只设置了一些最基本的信息,其它采用默认值。然后调用subscribe方法订阅主题,可以一次订阅多个主题的记录。
在一个死循环调用其poll方法获取记录,poll方法的参数是代表延期时间。如果kafka中有指定主题的记录,则会立即返回数据;如果没有则延迟时间过后返回空数据。poll方法的返回值是一个记录集,每次返回最大多少条记录跟配置有关。
需要注意的是,Kafka的消费者启动后,默认情况下只获取到其启动后新产生的记录,对于启动前产生的记录不处理。如果希望获取启动前的记录,则可以进行如下的设置
props.put("auto.offset.reset", "earliest");
上面设置会让消费者获取到启动前产生的记录(且这些记录没有被归属相同组的消费者消费过)。
如果要获取某个主题中的所有历史记录,可以设置auto.offset.reset属性值为earliest,且设置消费者归属的群组为一个新的群组,代码中通过设置group.id的属性来设置群组名,比如上面消费者的归属群组为hello。
(三)流计算 Kafka Streams是一个用于构建流计算程序的客户端库,可以将Kafka集群中主题中的记录处理后,输出到另外一个主题中(也可输出到其它系统)。下面我们看一个Kafka的流计算例子。使用Kafka Streams库,其Maven依赖如下:
org.apache.kafka kafka-streams 2.1.0

需要说明的是,该依赖包含了前面kafka-clients依赖导入的库,所以对于前面的消费者、生产者例子代码,只导入这个依赖也可以。
先看一个最简单例子,将Kafka中一个主题的数据不做处理的实时输出到另一个主题。代码如下:
package com.kafkademo; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; public class StreamDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("application.id", "my-stream-processing-application"); props.put("bootstrap.servers", "localhost:9092"); props.put("default.key.serde", Serdes.String().getClass()); props.put("default.value.serde", Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream source = builder.stream("my-input-topic"); source.to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }

Kafka Streams库的有几个核心的类:
1、KafkaStreams 类,用于启动流计算,其构造函数有2个参数,第1个参数是处理器拓谱(即指定如何处理),第2个参数是配置属性。
2、KStream本质是对一个数据流的抽象,提供各种DSL方法(高阶函数,类似集合框架的filter,map,reduce操作)来完成对数据的操作。
上面程序可直接运行。运行前,先创建两个主题 my-input-topic 和my-output-topic。运行程序后,我们可以运行Kafka提供的命令行脚本,一个用于往my-input-topic主题中发送记录,一个用于从my-output-topic中消费记录。当我们往my-input-topic主题中发送记录时,会发现my-output-topic中实时出现了相应的记录。这说明上面的程序成功实现了将一个主题中的记录输出到另一个主题中。
在上面例子基础上,我们将my-input-topic主题中的记录中的value值计算其字符串长度,然后将计算得到的长度值输出到my-output-topic主题中。我们只需将上面程序中的一句代码
source.to("my-output-topic")

改为如下的代码接口
source.mapValues(value->String.*valueOf*(value.length())).to("my-output-topic")

可以看出,这里是调用source对象的高阶函数mapValues将每条记录中值转换为值的长度,该函数返回的还是KStream类型的对象。然后再调用to方法将转换后的数据输出到另一个主题中。
KStream类提供了很多个高阶函数可以完成各种的数据计算,这里不再一一介绍。

    推荐阅读