创建SCRAM证书
Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,可与TLS一起使用执行安全认证。 用户名用作配置ACL等认证的Principal。Kafka中的默认SCRAM实现是在Zookeeper中存储SCRAM的证书,适用于Zookeeper在私有网络上的Kafka安装。
为用户car_record_user创建SCRAM凭证(密码是123456)
1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name car_record_user
为用户car_record_user创建SCRAM凭证(密码是123456)
1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name admin
可以使用–describe列出现有的证书
1
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name car_record_user
可以使用–delete为一个或多个SCRAM机制删除证书
1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name car_record_user
配置kafka broker
在每个Kafka broker的config目录下添加一个类似于下面的JAAS文件,我们姑且将其称为kafka_server_jaas.conf:
1
2
3
4
5
6
7
8[root@cloudsb config]# pwd
/usr/local/kafka_2.12-2.1.1/config
[root@cloudsb config]# vi kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
};其中,broker使用KafkaServer中的用户名和密码来和其他broker进行连接。 在这个例子中,admin是broker之间通信的用户。
1
2
3
4
5
6[root@cloudsb bin]# pwd
/usr/local/kafka_2.12-2.1.1/bin
[root@cloudsb bin]# vi kafka-server-start.sh
if [ "x$KAFKA_OPTS" = "x" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.1.1/config/kafka_server_jaas.conf"
fi在server.properties中配置SASL端口和SASL机制。
1
2
3
4
5
6
7
8
9
10
11
12[root@managementa config]# pwd
/usr/local/kafka_2.12-2.1.1/config
[root@managementa config]# cp server.properties server_scram.properties
[root@managementa config]# vi server_scram.properties
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256## (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256## (or SCRAM-SHA-512)
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
配置kafka客户端
为每个客户端配置JAAS配置(在producer.properteis或consumer.properteis)。登录模块展示了客户端(如生产者和消费者)如何连接到broker的。下面是配置了SCRAM机制的客户端的例子。1
2
3
4
5
6[root@managementa config]# vi kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="car_record_user"
password="123456";
};
客户端使用username和password来配置客户端连接的用户。在这个例子中,客户端使用用户car_record_user连接broker。在JVM中不同的客户端连接不同的用户(通过在sasl.jaas.config中指定不同的用户名和密码)。
客户端的JAAS配置通过指定作为JVM的参数。使用名为KafkaCLient的客户端登录。此选择仅允许来自JVM的所有客户端连接中的一个用户。
在 producer.properties 或 consumer.properties 中配置以下参数:1
2security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256 ##(or SCRAM-SHA-512)
启动生产者消费者时先指定环境变量1
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.1.1/config/kafka_client_jaas.conf"
或者这样1
2
3
4
5
6
7
8
9
10
11
12[root@cloudsa kafka_2.12-2.1.1]# pwd
/usr/local/kafka_2.12-2.1.1
[root@cloudsa kafka_2.12-2.1.1]# cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-scram.sh
[root@cloudsa kafka_2.12-2.1.1]# cp bin/kafka-console-producer.sh bin/kafka-console-producer-scram.sh
[root@cloudsa kafka_2.12-2.1.1]# vi bin/kafka-console-consumer-scram.sh
if [ "x$KAFKA_OPTS" = "x" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.1.1/config/kafka_client_jaas.conf"
fi
[root@cloudsa kafka_2.12-2.1.1]# vi bin/kafka-console-producer-scram.sh
if [ "x$KAFKA_OPTS" = "x" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.1.1/config/kafka_client_jaas.conf"
fi
再指定参数1
2--consumer.config config/p_c.properties
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/p_c.properties
启动与示例
1 | [root@managementa kafka_2.12-2.1.1]# bin/kafka-server-start.sh -daemon config/server_scram.properties |
另外一种客户端配置方式
1 | [root@cloudsa kafka_2.12-2.1.1]# cat config/p_c.properties |
acl配置示例
1 | [root@cloudsa kafka_2.12-2.1.1]# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:car_record_user --operation Read --topic hch-v_tb_car_record --group car-group |
查看用户
1 | [root@cloudsb kafka_2.12-2.1.1]# bin/zookeeper-shell.sh localhost:2181 |