Java连接Kafka|Java连接Kafka Kerberos
Java连接Kafka Kerberos
【Java连接Kafka|Java连接Kafka Kerberos】平台:1.配置Ambari hdp 2.6.2.0
开启keberos
kafka_client_jaas.conf
- 注意
keyTab
和principal
两个配置项
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
principal="kafka/yamb2@EXAMPLE.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
principal="kafka/yamb2@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/yamb2@EXAMPLE.COM";
};
2.kafka Producer Java Demo
- 在kafka中创建一个topic:
cw_test2019042301
-
kafka_client_jaas.conf
为上一步配置的 -
krb5.conf
为集群上的配置文件。默认目录为/etc/krb5.conf
package com.caiw.nuwapi;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Author: caiwei
* @Description:
* @Date: create in 2019/4/12 14:54
*/
public class TestProducer {public static void main(String... args) throws InterruptedException {
String topic = "cw_test2019042301";
System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
//System.setProperty("security.auth.useSubjectCredsOnly","false");
Properties props = new Properties();
props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
props.put("sasl.kerberos.service.name","kafka");
KafkaProducer producer = new KafkaProducer<>(props);
for (int i = 0;
i < 10000;
i++){
String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
System.out.println(s);
producer.send(new ProducerRecord<>(topic,s ));
Thread.sleep(1000);
}
}
}
3.kafka Consumer Java Demo
- 在kafka中创建一个topic:
cw_test2019042301
-
kafka_client_jaas.conf
为上一步配置的 -
krb5.conf
为集群上的配置文件。默认目录为/etc/krb5.conf
package com.caiw.nuwapi;
import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* @Author: caiwei
* @Description:
* @Date: create in 2019/4/12 14:54
*/
public class TestConsumer {
private static Map currentOffsets = new HashedMap();
int count = 0;
public static void main(String[] args) {
System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
Properties props = new Properties();
props.put("group.id", "test_2019042301");
// 指定消费者组
props.put("enable.auto.commit", "true");
// 关闭自动提交
//props.put("auto.commit.interval.ms", "1000");
// 自动提交的时间间隔
// 反序列化消息主键
props.put("auto.offset.reset", "earliest");
// 缓冲大小
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 反序列化消费记录
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("log4j.appender.kafkaAppender.Threshold", "ERROR");
//props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
props.put("bootstrap.servers", "192.168.23.162:6667,192.168.23.163:6667,192.168.23.164:6667");
//props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
props.put("sasl.kerberos.service.name","kafka");
// 创建一个消费者实例对象
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 订阅消费主题集合
consumer.subscribe(Collections.singletonList("cw_test2019042301"));
// 实时消费标识
boolean flag = true;
while (flag) {
// 获取主题消息数据
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records){
// 循环打印消息记录
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
//处理消息
System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s%n", record.offset(), record.key(), record.value());
//解析消息将消息存储到Hbase上的表中;
// consumer.commitSync(currentOffsets);
//手动提交偏移量
}}
// 出现异常关闭消费者对象
//consumer.commitAsync();
//consumer.commitSync();
consumer.close();
}
}
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用
- Java基础-高级特性-枚举实现状态机