环境 操作系统:CentOS 7.3
Kafka Version:2.12
Zookeeper Version:3.6.1
一、Zookeeper集群配置SASL zookeeper所有节点都是对等的,只是各个节点角色可能不相同。以下步骤所有的节点配置相同。
1、zoo.cfg文件配置
为zookeeper添加SASL支持,在配置文件zoo.cfg添加
????
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=3600000
2.新建zoo_jaas.conf文件,为Zookeeper添加账号认证信息
这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/home路径下。zk_server_jaas.conf文件的内容如下
Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapwd"user_kafka="kafkapwd";
};
username和paasword是zk集群之间的认证密码。
user_kafka="kafkaUser65#.com"定义了一个用户"kafka",密码是"kafkaUser65#.com"
3.将Kafka相关jar包导入到Zookeeper
Zookeeper的认证机制是使用插件,“org.apache.kafka.common.security.plain.PlainLoginModule”,所以需要导入Kafka相关jar包,kafka-clients相关jar包,在kafka服务下的lib目录中可以找到,根据kafka不同版本,相关jar包版本会有所变化。
所需要jar包如下,在zookeeper下创建目录zk_sasl_lib将jar包放入(目录名与位置可以随便,后续引用指定即可):
kafka-clients-1.1.1.jarlz4-java-1.4.1.jarslf4j-api-1.7.25.jarslf4j-log4j12-1.7.25.jarsnappy-java-1.1.7.1.jar
4.修改zkEnv.sh,主要目的就是将这几个jar包使zookeeper读取到
在$KAFKA_HOME/bin目录下找到zkEnv.sh文件,添加如下代码
注意引用的目录下jar包,与之前创建的zoo_jaas.conf文件
for i in /opt/zookeeper-3.4.14/zk_sasl_lib/*.jar;
doCLASSPATH="$i:$CLASSPATH"doneSERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper-3.4.14/conf/zoo_jaas.conf"
5.重启Zookeeper服务
zkServer.sh restart
查看状态
zkServer.sh status
二、Kafka集群配置SASL
- 注:所有节点操作相同
内容如下(这里的Client与Zookeeper相对应,KafkaServer与后期调用时读取的KafkaClient相对应,是消费生产的账号密码,不要弄混了):
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="leo"password="leopwd"user_leo="leopwd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapwd";
};
先解释KafkaServer,使用user_
Client配置节则容易理解得多,主要是broker链接到zookeeper,从上文的Zookeeper JAAS文件中选择一个用户,填写用户名和密码即可。
【kafka|SpringBoot 支持Kafka安全认证 SASL/PLAINTEXT,账号密码认证】2.在Kafka Server.properties添加、修改如下信息
listeners=SASL_PLAINTEXT://172.xx.xx.xx:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=PLAINsasl.enabled.mechanisms=PLAINallow.everyone.if.no.acl.found=trueauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
3.Kafka启动脚本中加入配置,读取第一步创建的文件,kafka_server_jaas.conf
修改kafka的kafka-server-start.sh文件,
在如下代码
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
添加
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.11-1.1.1/config/kafka_server_jaas.conf"
4.启动Kafka服务,查看日志是否正常
kafka-server-start.sh -daemon /opt/kafka_2.11-1.1.1/config/server.properties
三、Java客户端调用认证 这里只对配置进行举例,其他操作不变
##############################kafka配置#############################
spring.kafka.bootstrap-servers=172.xx.xx.xx:9092
spring.kafka.listener.missing-topics-fatal=false
# 消费者基础配置
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.properties.max.poll.interval.ms=1800000
# 消费者授权配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="leo" password="leopwd";
# 生产者授权配置
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="leo" password="leopwd";
##############################kafka配置#############################
ACL用户权限控制具体配置请查看:https://datamining.blog.csdn.net/article/details/90291813
推荐阅读
- 关于kafka数据丢失场景的一次激烈讨论....
- Kafka的生产集群部署
- 聊聊 Kafka(如何避免消费组的 Rebalance)
- 深入解析Kafka的offset管理
- SpringBoot|spring boot中使用kafka详解(踩完坑又爬了出来)
- java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)
- kafka的优缺点都有那些
- kafka|Kafka的数据是如何存储的
- Kafka|Kafka VS RocketMQ VS RabbitMQ
- #|Zero-Copy