公开课回放地址:https://www.bilibili.com/video/BV1yQ4y1g7MW
kafka的权限设置
Kafka 认证机制
自 0.9.0.0 版本开始,Kafka 正式引入了认证机制,用于实现基础的安全用户认证,这是将 Kafka 上云或进行多租户管理的必要步骤。截止到 2.3 版本,Kafka 支持基于 SSL 和基于 SASL 的安全认证机制。
基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。 通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么就要启用双路认证,也就是说 Broker 也要认证客户端的证书。
Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。 Kafka 支持的 SASL 机制有 5 种,它们分别是在不同版本中被引入的。
- GSSAPI:也就是 Kerberos 使用的安全接口,是在 0.9 版本中被引入的。
- PLAIN:是使用简单的用户名 / 密码认证的机制,在 0.10 版本中被引入。
- SCRAM:主要用于解决 PLAIN 机制安全问题的新机制,是在 0.10.2 版本中被引入的。
- OAUTHBEARER:是基于 OAuth 2 认证框架的新机制,在 2.0 版本中被引进。
- Delegation Token:补充现有 SASL 机制的轻量级认证机制,是在 1.1.0 版本被引入的。
认证机制的比较
目前来看,使用 SSL 做信道加密的情况更多一些,但使用 SSL 实现认证不如使用 SASL。毕竟,SASL 能够支持选择不同的实现机制,如 GSSAPI、SCRAM、PLAIN 等。因此,建议使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。
SASL/GSSAPI 主要是给 Kerberos 使用的。GSSAPI 适用于本身已经做了 Kerberos 认证的场景,这样的话,SASL/GSSAPI 可以实现无缝集成。
SASL/PLAIN 是一个简单的用户名 / 密码认证机制,通常与 SSL 加密搭配使用。对于一些小公司而言,搭建公司级的 Kerberos 可能并没有什么必要,他们的用户系统也不复杂,特别是访问 Kafka 集群的用户可能不是很多。对于 SASL/PLAIN 而言,这就是一个非常合适的应用场景。总体来说,SASL/PLAIN 的配置和运维成本相对较小,适合于小型公司中的 Kafka 集群。
SASL/PLAIN 有这样一个弊端:它不能动态地增减认证用户,必须重启 Kafka 集群才能令变更生效。因为所有认证用户信息全部保存在静态文件中,所以只能重启 Broker,才能重新加载变更后的静态文件。
SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。
SASL/OAUTHBEARER 是 2.0 版本引入的新认证机制,主要是为了实现与 OAuth 2 框架的集成。 Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全的 JSON Web Token,必须配以 SSL 加密才能用在生产环境中。
Delegation Token 是在 1.1 版本引入的,它是一种轻量级的认证机制,主要目的是补充现有的 SASL 或 SSL 认证。 如果要使用 Delegation Token,需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样,Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。
本案例以SASL中的PLAIN方式作为讲解内容
环境准备
首先选择海牛云平台的大数据组件中的kafka,加入实验环境,启动镜像
软件版本
jdk1.8.0_144
kafka_2.13-2.6.0
zookeeper-3.4.8
启动后开始本次 实验的配置
# 因为在实验启动的时候里面的zookeeper和kafka的组件已经启动,这个时候需要重新配置,那么首先先关闭集群
# 关闭命令,每个节点都要执行
kafka-server-stop.sh
集群配置
首先选择kafka的配置文件,增加如下配置
vim /opt/kafka_2.13-2.6.0/config/server.properties
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
每个机器都要增加这些配置,这个时候已经启动了kafka的认证配置
但是认证配置中需要增加用户名和密码
这个时候增加kafka的权限配置信息
vim /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf
# 增加如下配置
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_user2="123456";
};
#分发文件到不同的机器节点
scp /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf root@kafka2-24160:/opt/kafka_2.13-2.6.0/config/
scp /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf root@kafka3-24160:/opt/kafka_2.13-2.6.0/config/
此配置定义了两个用户(admin和user2)。KafkaServer中的username、password配置的用户和密码,是用来broker和broker连接认证。在本例中,admin是代理broker间通信的用户。user_userName配置为连接到broker的所有用户定义密码,broker使用这些验证所有客户端连接,包括来自其他broker的连接。
请注意,username是服务端连接的账号,password为密码,其中下面的user_admin就是为了集群间通信时候使用的
将JAAS配置文件位置作为JVM参数传递给每个Kafka broker
在kafka-run-class.sh中增加如下配置:219行
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf"
# 增加配置后进行分发
scp /opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh root@kafka2-24160:/opt/kafka_2.13-2.6.0/bin/
scp /opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh root@kafka3-24160:/opt/kafka_2.13-2.6.0/bin/
启动kafka进行启动集群
kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties
这个时候集群启动完毕已经带有权限
客户端连接
集群已经启动权限,这个时候操作集群的命令是完全不生效的,我们需要配置连接的用户进行验证
使用kafka-topics.sh做topic的管理
首先配置权限认证信息 auth.conf
vim /root/auth.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="123456";
其中的用户名必须是admin 123456
或者是 user2 123456
kafka-topics.sh --bootstrap-server kafka1-24160:9092 --create --topic hainiu --partitions 3 --replication-factor 2 --command-config auth.conf
会存在警告信息,但是不会影响使用
消费这个生产信息
kafka-console-producer.sh --bootstrap-server kafka1-24160:9092 --topic hainiu --producer.config auth.conf
kafka-console-consumer.sh --bootstrap-server kafka1-24160:9092 --topic hainiu --consumer.config auth.conf
可以通过制定用户信息进行生产和消费数据
同时也可以修改配置文件信息直接使用生产者和消费者
vim producer.properties
vim consumer.properties
# 增加如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="123456";
权限配置
# 修改server.properties 增加如下配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
使用权限配置并且超级用户是admin,这个用户和我们在上面配置的列表中的admin用户相互映射
每个机器都进行配置
重启所有的节点,这个时候已经开启了权限
# 配置用户认证文件信息
vim /root/admin.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
这个时候使用kafka的命令进行操作
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2
在没增加认证配置信息的时候没有任何反应
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2 --command-config auth.conf
发现用户没有权限,这个时候加入admin.conf进行认证
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu1 --partitions 3 --replication-factor 2 --command-config admin.conf
因为admin中的是超级用户
这个时候可以增加用户权限
#命令为
kafka-acls.sh
参数如下:
- --add 增加权限 --remove删除
- --allow-host 允许操作的主机 --deny-host 拒绝的主机
- --allow-principal 允许操作的用户 --deny-principal 拒绝用户
- --bootstrap-server 集群地址
- --command-config 认证文件信息
- --group 可以消费topic的消费者组
- --list 查询所有权限
- --operation 增加操作权限
- Describe
- DescribeConfigs
- Alter
- Read
- Delete
- Create
- All
- Write
- AlterConfigs
- --topic 指定topic
查询topic的所有权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --list --command-config admin.conf
删除所有权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --topic topic_hainiu --remove --command-config admin.conf
当前用户没有权限,现在给海牛用户赋值权限
kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --topic topic_hainiu --operation all --command-config admin.conf
kafka-topics.sh --bootstrap-server nn1:9092 --list --command-config auth.conf
回收权限
kafka-acls.sh --bootstrap-server nn1:9092 --remove --allow-principal User:hainiu --topic topic_hainiu --operation all --command-config admin.conf
kafka-acls.sh --bootstrap-server nn1:9092 --list --topic topic_hainiu --command-config admin.conf
这个时候没有他的访问权限
增加权限并且增加ip限制
kafka-acls.sh --bootstrap-server nn1:9092 --add --deny-principal User:hainiu --topic topic_hainiu --operation all --deny-host 11.237.80.2 --command-config admin.conf
发现不可以访问
kafka-acls.sh --bootstrap-server nn1:9092 --remove --deny-principal User:hainiu --topic topic_hainiu --operation all --deny-host 11.237.80.2 --command-config admin.conf
现在可以访问了
生产者和消费者的权限设置
# 删除所有权限信息
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --topic topic_hainiu --remove --command-config admin.conf
#生产者没有权限
kafka-console-producer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --producer.config auth.conf
# 增加权限
kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Write --topic topic_hainiu --command-config admin.conf
生产者可以生产数据
现在进行消费数据
kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --from-beginning --consumer.config auth.conf
现在进行权限分配
kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Read --topic topic_hainiu --command-config admin.conf
发现增加完毕权限还是不可以读取数据,因为消费者组没有增加权限
kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Read --topic topic_hainiu --group hainiu --command-config admin.conf
kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --from-beginning --consumer.config auth.conf --group hainiu
已经可以消费数据
代码客户端认证
首先增加resources中配置文件
kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="hainiu"
password="123456";
};
代码如下:
package com.hainiu.spark.kafka01
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
/**
* producer[interceptor serializer partitioner]
* bootstrap-server
* linger.ms
* batch-size
* topic
* 生产者放入数据只能针对于同一个topic
*/
object TestProducer01 {
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.jaas")
val pro = new Properties()
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092")
pro.put(ProducerConfig.LINGER_MS_CONFIG,"100")
pro.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
pro.put(ProducerConfig.ACKS_CONFIG,"all")
pro.put(ProducerConfig.RETRIES_CONFIG,"3")
pro.put("security.protocol", "SASL_PLAINTEXT"); //2.协议需要配置
pro.put("sasl.mechanism", "PLAIN");
//新的版本中 0.11 3.3.2 幂等性
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](pro)
for(i<- 1 to 10){
val record = new ProducerRecord[String,String]("topic_hainiu",s"world_${i}")
producer.send(record)
}
producer.close()
}
}
kafka的信道加密
首先选择集群并且修改每个机器的域名为
因为在认证中需要域名信息
vim /etc/hosts
nn1.hainiu.com
nn2.hainiu.com
s1.hainiu.com
选择nn1节点
mkdir -p /root/config/certificates
用于生成证书
创建KeyStore密钥库
Keytool是一个Java数据证书的管理工具 ,Keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中,Keystore可以简单理解为一个存放应用签名的文件。我们可以称它为秘钥库。在keystore里,包含两种数据:
密钥实体(Key entity)——密钥(secret key)又或者是私钥和配对公钥(采用非对称加密)
可信任的证书实体(trusted certificate entries)——只包含公钥
keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -validity 365 -genkey -keyalg RSA -storepass 123456 -keypass 123456 -dname "CN=*.hainiu.com, OU=kafka, O=kafka, L=shenzhen, ST=guangdong, C=CN"
CN
域名OU
部门O
公司名L
城市ST
省份C
国家
查看KeyStore内容
keytool -list -v -keystore /root/config/certificates/kafka.keystore -storepass 123456
创建CA(Certificate Authority:认证机构)
CA是负责签发证书、认证证书、管理已颁发证书的机构,为了保证整个证书的安全性,所以需要使用CA进行证书的签名保证。
openssl req -new -x509 -keyout /root/config/certificates/ca-key -out /root/config/certificates/ca-cert -days 365 -passin pass:123456 -passout pass:123456 -subj "/C=CN/ST=guangdong/L=shenzhen/O=kafka/CN=*.hainiu.com"
将CA导入到TrustStore中
keystore 与 truststore 区别
keystore是存储密钥(公钥、私钥)的容器。
keystore和truststore其本质都是keystore。只不过二者存放的密钥所有者不同而已。本质都是相同的文件,只不过约定通过文件名称区分类型以及用途
对于keystore一般存储自己的私钥和公钥,而truststore则用来存储自己信任的对象的公钥
keytool -keystore "/root/config/certificates/kafka.truststore" -alias CARoot -import -file "/root/config/certificates/ca-cert" -storepass "123456" -keypass "123456" -noprompt
导出证书
keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -certreq -file /root/config/certificates/kafka-cert -storepass "123456" -keypass "123456"
给证书签名
openssl x509 -req -CA /root/config/certificates/ca-cert -CAkey /root/config/certificates/ca-key -in /root/config/certificates/kafka-cert -out /root/config/certificates/kafka-cert-signed -days 365 -CAcreateserial -passin pass:"123456"
导入CA到KeyStore
keytool -keystore /root/config/certificates/kafka.keystore -alias CARoot -import -file /root/config/certificates/ca-cert -storepass "123456" -keypass "12345" -noprompt
导入证书到KeyStore
keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -import -file /root/config/certificates/kafka-cert-signed -storepass "123456" -keypass "123456"
复制配置文件
cp server.properties ssl.properties
vim ssl.properties 增加配置信息
listeners=SSL://nn1.hainiu.com:9092
advertised.listeners=SSL://nn1.hainiu.com:9092
ssl.keystore.location=/root/config/certificates/kafka.keystore
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/config/certificates/kafka.truststore
ssl.truststore.password=123456
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=SSL
分发配置文件
scp /usr/local/kafka/config/ssl.properties root@nn2:/usr/local/kafka/config/
scp -r /root/config root@s1:/root/
客户端访问信息,需要增加如下配置
security.protocol=SSL
ssl.truststore.location=/root/config/certificates/kafka.truststore
ssl.truststore.password=123456
ssl.keystore.password=123456
ssl.keystore.location=/root/config/certificates/kafka.keystore
命令如下:
kafka-topics.sh --bootstrap-server nn1.hainiu.com:9092 --list --command-config client.conf
代码链接
kafka.keystore
kafka.truststore复制到自己的本地
object TestProducer01 {
def main(args: Array[String]): Unit = {
val pro = new Properties()
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1.hainiu.com:9092")
pro.put(ProducerConfig.LINGER_MS_CONFIG,"100")
pro.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
pro.put(ProducerConfig.ACKS_CONFIG,"all")
pro.put(ProducerConfig.RETRIES_CONFIG,"3")
pro.put("security.protocol","SSL")
pro.put("ssl.truststore.location","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.truststore")
pro.put(" ssl.truststore.password","123456")
pro.put("ssl.keystore.password","123456")
pro.put("ssl.keystore.location","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.keystore")
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
//默认存在分区器 DefaultPartitioner
val producer = new KafkaProducer[String, String](pro)
for(i<- 1 to 10){
val record = new ProducerRecord[String,String]("topic_hainiu",s"world_${i}")
producer.send(record,new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
// 元数据对象,发送到哪了 没有错误代表成功
//这里和sender线程没关系,主线程发送数据完毕会接受到ack,但是仅仅是接受
if(exception == null){
println(metadata.topic(),metadata.partition(),metadata.offset())
println("成功了!!!")
}else{
println("失败了!!!")
}
}
})
}
producer.close()
}
}