搭建 环境
kafka-2.6.2_2.13
zookeeper-3.5.9
jdk-8u201 ( jdk1.8.0_201 )
centos7
Zookeeper 开启SASL认证 vim conf/zoo.cfg
,增加如下安全配置
1 2 3 4 5 6 ... authProvider.1 =org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme =sasl jaasLoginRenew =3600000
创建JASS配置 vim conf/zookeeper_sasl_jaas.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super ="super!@#" # 创建超级管理员,用户名:super,密码:super!@# user_kafka ="kafka!@#" # 创建kafka服务专用账户:用户名:kafka,密码:kafka!@# user_public ="public!@#"; # 创建一个专门的对外的开放账户用于接入zk服务,防止破坏kafka数据 }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username ="super" password ="super!@#"; };
注意:jass配置中,没个模块和用户信息后以 “;” 结束
将JASS配置注入环境变量 vim bin/zkEnv.sh
1 2 3 4 5 6 7 8 9 10 export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/workspace/zookeeper/latest/conf/zookeeper_sasl_jaas.conf $SERVER_JVMFLAGS" export CLIENT_JVMFLAGS="-Djava.security.auth.login.config=/workspace/zookeeper/latest/conf/zookeeper_sasl_jaas.conf $CLIENT_JVMFLAGS" 如上步骤中的配置应用到每个节点后,可以启动zk级群 ```shell bin/zkServer.sh start
Kafka 开启sasl和acl vim config/server.properties
, 增加sasl配置和acl配置,同时增加2个超级用户:
admin:用于kafka之间互连、内部运维
public:用于外部平台服务接入,必要时可以将用户从zk中删除或修改其密码
A:开启sasl和acl配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 zookeeper.connect =127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181 ... listeners =SASL_PLAINTEXT://127.0.0.1:9092 super.users =User:admin;User:public security.inter.broker.protocol =SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol =SCRAM-SHA-512 sasl.enabled.mechanisms =SCRAM-SHA-512 authorizer.class.name =kafka.security.authorizer.AclAuthorizer zookeeper.set.acl =true allow.everyone.if.no.acl.found =false
B:当启用kafka内外网分流时,应该这样配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 zookeeper.connect =127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181 ... listener.security.protocol.map =INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT listeners =INTERNAL://127.0.0.1:9092,EXTERNAL://公网IP:9093 advertised.listeners =INTERNAL://127.0.0.1:9092,EXTERNAL://xxxx inter.broker.listener.name =INTERNAL super.users =User:admin;User:public sasl.mechanism.inter.broker.protocol =SCRAM-SHA-512 sasl.enabled.mechanisms =SCRAM-SHA-512 authorizer.class.name =kafka.security.authorizer.AclAuthorizer zookeeper.set.acl =true allow.everyone.if.no.acl.found =false
创建JASS配置 这里,将3个必要的jass配置模块写到一起,方便使用:
KafkaServer
此模块为broker内部互连的认证配置
client
此模块为broker连接zk的客户端认证配置
KafkaClient
此模块为kafka客户端连接kafka服务的客户端认证配置
vim config/kafka_sasl_jaas.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # broker内部通信认证配置,这里配置的用户必须为超级用户,见config/server.properties中的super.users配置 KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret!@#"; }; # kafka连接zk的客户端认证配置,注意:这里的用户要与zk中已配置的用户对应,见zk的jass配置:conf/zookeeper_sasl_jaas.conf Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="kafka" password="kafka!@#"; }; # kafka客户端连接broker的客户端认证配置,用于bin/*.sh下的各种kafkaTool连接kafka使用 KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret!@#"; };
将JASS配置注入环境变量 注入到 kafka-run-class.sh 中即可,各个kafka tool都会调用这个shellvim bin/kafka-run-class.sh
1 export KAFKA_OPTS="-Djava.security.auth.login.config=/workspace/kafka/latest/config/kafka_sasl_jaas.conf ${KAFKA_OPTS}"
创建kafka用户 将以上步骤中的配置应用到所有kafka broker节点 至此,kafka jass.config中配置的broker内部通信用户还未创建起来,直接启动将失败,所以需要先创建kafka用户
创建broker内部通信超级用户 (用户名密码与config/kafka_sasl_jaas.conf文件中一致)
1 bin/kafka-configs.sh --zookeeper zk:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-secret!@#]' --entity-type users --entity-name admin
创建对外开放超级用户(超级用户在config/server.properties中的super.users配置指定)
1 bin/kafka-configs.sh --zookeeper zk:2181 --alter --add-config 'SCRAM-SHA-512=[password=public-secret!@#]' --entity-type users --entity-name public
1 bin/kafka-configs.sh --zookeeper zk:2181 --alter --add-config 'SCRAM-SHA-512=[password=test-kafka]' --entity-type users --entity-name test-kafka
启动kafka集群 1 source /etc/profile && cd /workspace/kafka/latest && nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
验证broker创建的zk节点信息 查看zk上创建的kafka相关目录acl权限,若kafka通过sasl+acl连接zk集群,则创建的zk节点归kafka用户所有,其它非超级用户仅可读,如下:
1 2 3 4 5 6 [zk: localhost:2181(CONNECTED) 0] getAcl /brokers/topics 'sasl,'kafka : cdrwa 'world,'anyone : r [zk: localhost:2181(CONNECTED) 1]
若权限不正确,请检查kefka配置是否启用了zk acl机制,即配置:zookeeper.set.acl=true
测试 读写数据 创建topic (一)zookeeper方式创建
1 2 3 4 # 1、由于上述步骤中,在kafka-run-class.sh中已经配置kafka_sasl_jaas.conf文件(Client信息),因此该步骤可忽略,如未配置,需在kafka-topics.sh中添加Client信息 # 2、创建topic bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic test
(二)bootstrap-server方式创建
1 2 3 4 5 6 7 8 # 1、config/conf.properties 文件内容 security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 # 2、由于上述步骤中,在kafka-run-class.sh中已经配置kafka_sasl_jaas.conf文件(KafkaClient信息),因此该步骤可忽略,如未配置,需在kafka-topics.sh中添加KafkaClient信息 # 3、创建topic bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --command-config ./config/conf.properties --replication-factor 2 --partitions 3 --topic test
写数据
请先确保对应用户是否已经创建,本例中使用test-kafka用户
创建config/producer.properties 1 2 3 security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test-kafka" password="test-kafka";
1 bin/kafka-console-producer.sh --topic test --bootstrap-server 127.0.0.1:9092 --producer.config config/producer.properties
数据数据后会报以下错误, 原因是还未添加 test-kafka 该用户对test的写权限
1 2 3 4 [2021-11-11 15:07:28,160] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) [2021-11-11 15:07:28,164] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata) [2021-11-11 15:07:28,166] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
1 2 3 4 5 6 7 # 第一种:连接zookeeper创建 bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:test-kafka --operation Write --topic test 或 # 第二种:使用KafkaAdmin创建(该方式会加载kafka_sasl_jaas.conf文件中KafkaClient信息) bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config ./conf.properties --add --allow-principal User:test-kafka --operation Write --topic test
1 2 3 4 5 Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=WRITE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=WRITE, permissionType=ALLOW)
消费数据 添加读权限 1 2 3 4 5 6 7 # 第一种:连接zookeeper创建 bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:test-kafka --operation Read --topic test 或 # 第二种:使用KafkaAdmin创建(该方式会加载kafka_sasl_jaas.conf文件中KafkaClient信息) bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config ./conf.properties --add --allow-principal User:test-kafka --operation Read --topic test
1 2 3 4 5 6 Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW)
添加消费组权限 1 2 3 4 5 6 7 # 第一种:连接zookeeper创建 bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:test-kafka --operation Read --topic test --group test 或 # 第二种:使用KafkaAdmin创建(该方式会加载kafka_sasl_jaas.conf文件中KafkaClient信息) bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config ./conf.properties --add --allow-principal User:test-kafka --operation Read --topic test --group test
1 2 3 4 5 6 7 8 9 10 11 12 Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW) Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW) (principal=User:test-kafka, host=*, operation=WRITE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test, patternType=LITERAL)`: (principal=User:test-kafka, host=*, operation=READ, permissionType=ALLOW)
消费 1 bin/kafka-console-consumer.sh --topic test --bootstrap-server 127.0.0.1:9092 --group test --consumer.config producer.conf
动态配置用户 添加用户 SCRAM证书
注:动态添加用户SCRAM证书,只能使用–zookeeper创建,–bootstrap-server参数只支持创建Quota配置
1 bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=simba],SCRAM-SHA-512=[password=simba]' --entity-type users --entity-name simba
查看用户 SCRAM证书
1 bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-type users --entity-name simba
删除用户 SCRAM证书
1 bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name simba
1 bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name simba
java 客户端配置 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void producer () throws Exception { Properties props = new Properties(); String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";" ; String jaasCfg = String.format(jaasTemplate, "test-kafka" , "test-kafka" ); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SEFVER); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer" ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put("security.protocol" , "SASL_PLAINTEXT" ); props.put("sasl.mechanism" , "SCRAM-SHA-512" ); props.put("sasl.jaas.config" , jaasCfg); KafkaProducer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 0 ; i < 10 ; i++) { Future<RecordMetadata> test = producer.send(new ProducerRecord<>("test" , i + "" )); RecordMetadata metadata = test.get(); System.out.println(metadata.offset()); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Test public void consumer () { String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";" ; String jaasCfg = String.format(jaasTemplate, "test-kafka" , "test-kafka" ); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SEFVER); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test" ); props.put("security.protocol" , "SASL_PLAINTEXT" ); props.put("sasl.mechanism" , "SCRAM-SHA-512" ); props.put("sasl.jaas.config" , jaasCfg); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton("test" )); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1 )); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> next = iterator.next(); System.out.println(next.value()); } }