Kafka|Kafka Java Api

相关知识
1.Kafka提供了Producer类作为Java producer的api,该类有sync和async两种发送方式。默认是sync方式,即producer的调用类在消息真正发送到队列中去以后才返回。
(1)Kafka提供的java api中的Producer,底层只是维护该topic到每个broker的连接,并不是一个传统意义上的连接池。在使用sync方式时,我们应该自己实现一个连接池,里面包含若干Producer对象,以实现最大化写入效率。
(2)在写入的数据频率不高或要求获得写入结果时,应使用sync方式,否则会因async的等待时间引入额外的延迟。
(3)在写入的数据频率很高时,应使用async方式,以batch的形式写入,获得最大效率。
async方式与sync方式的不同在于,在初始化scala的producer时,会创建一个ProducerSendThread对象。然后,在调用send时,它并不是直接调用eventHandler.handle方法,而是把消息放入一个长度由queue.buffering.max.messages参数定义的队列(默认10000),当队列满足以下两种条件时,会由ProducerSendThread触发eventHandler.handle方法,把队列中的消息作为一个batch发送
①时间超过queue.buffering.max.ms定义的值,默认5000ms
②队列中当前消息个数超过batch.num.messages定义的值,默认200
2.Kafka的Consumer有两种Consumer的高层API、简单API–SimpleConsumer
(1)Consumer的高层API
主要是Consumer和ConsumerConnector,这里的Consumer是ConsumerConnector的静态工厂类
class Consumer {
public static kafka.javaapi.consumer.ConsumerConnector
createJavaConsumerConnector(config: ConsumerConfig);
}
具体的消息的消费都是在ConsumerConnector中
创建一个消息处理的流,包含所有的topic,并根据指定的Decoder
public Map>>createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder);
创建一个消息处理的流,包含所有的topic,使用默认的Decoder
public Map>> createMessageStreams(Map topicCountMap);
【Kafka|Kafka Java Api】获取指定消息的topic,并根据指定的Decoder
public List>>createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);
获取指定消息的topic,使用默认的Decoder
public List> createMessageStreamsByFilter(TopicFilter topicFilter);
提交偏移量到这个消费者连接的topic
public void commitOffsets();
关闭消费者
public void shutdown();
高层的API中比较常用的就是public List> createMessageStreamsByFilter(TopicFilter topicFilter); 和public void commitOffsets();
(2)Consumer的简单API–SimpleConsumer
批量获取消息
public FetchResponse fetch(request: kafka.javaapi.FetchRequest);
获取topic的元信息
public kafka.javaapi.TopicMetadataResponse send(request:kafka.javaapi.TopicMetadataRequest);
获取目前可用的偏移量
public kafka.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
关闭连接
public void close();
对于大部分应用来说,高层API就已经足够使用了,但是若是想做更进一步的控制的话,可以使用简单的API,例如消费者重启的情况下,希望得到最新的offset,就该使用SimpleConsumer。
系统环境
Linux Ubuntu 20.04
OpenJDK-11.0.11
kafka_2.13-2.8.0
zookeeper-3.6.3
IntelliJ IDEA 2021.1 (Ultimate Edition)
任务内容
本实验是使用简单Java API来模拟Kafka的producer和consumer,其中producer是通过一个while循环生成内容,然后将内容传递给Kafka,consumer从Kafka中读取内容,并在Console界面中输出。
任务步骤
1.打开Idea,新建一个Java项目,将hadoop中的配置文件加到resources中。
2.添加maven依赖

org.apache.kafka kafka-clients 1.1.1

3.启动ZooKeeper。切换到/apps/zookeeper/bin目录下,执行ZooKeeper的启动脚本。
cd /apps/zookeeper/bin ./zkServer.sh start

查看ZooKeeper的运行状态。
./zkServer.sh status

Kafka|Kafka Java Api
文章图片

4.切换目录到/apps/kafka目录下,启动kafka的server。
cd /apps/kafka bin/kafka-server-start.sh config/server.properties &

Kafka|Kafka Java Api
文章图片

5.另起一窗口,切换到/apps/kafka下,在kafka中创建topic,命名为dblab01。
cd /apps/kafka bin/kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --topic dblab01 \ --partitions 1

查看topic
cd /apps/kafka bin/kafka-topics.sh--list--zookeeperlocalhost:2181

Kafka|Kafka Java Api
文章图片

6.创建kafka的producer,用于生产数据。在包下,创建Class,命名为MyProducer。
package my.study.kafka; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class MyProducer { public void produce(){ Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "localhost:9092"); //ack模式,all是最慢但最安全的 props.put("acks", "all"); //失败重试次数 props.put("retries", 0); //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端 props.put("batch.size", 16384); props.put("linger.ms", 1); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 props.put("buffer.memory", 33554432); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord("dblab01", Integer.toString(i), Integer.toString(i))); producer.close(); }public static void main(String[] args) { new MyProducer().produce(); } }

producer端的代码:首先定义一个topic的名称,然后创建一个properties实例,用来设置produce的参数。接着创建一个producer的实例并将参数配置props作为参数上传进去。在produce方法中定义一个key与data,创建KeyedMessage实例,并将key,data和topic作为参数上传进去,然后把KeyedMessage实例上传给producer。在主函数中直接调用MyProduce的produce()方法,用来实现消息的上传。

    推荐阅读