[B 站公开课] Kafka 的多种认证方式和信道加密

教程 青牛 ⋅ 于 2023-12-19 11:54:15 ⋅ 2125 阅读

公开课回放地址:https://www.bilibili.com/video/BV1yQ4y1g7MW

kafka的权限设置

Kafka 认证机制

自 0.9.0.0 版本开始,Kafka 正式引入了认证机制,用于实现基础的安全用户认证,这是将 Kafka 上云或进行多租户管理的必要步骤。截止到 2.3 版本,Kafka 支持基于 SSL 和基于 SASL 的安全认证机制。

基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。 通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么就要启用双路认证,也就是说 Broker 也要认证客户端的证书。

Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。 Kafka 支持的 SASL 机制有 5 种,它们分别是在不同版本中被引入的。

  • GSSAPI:也就是 Kerberos 使用的安全接口,是在 0.9 版本中被引入的。
  • PLAIN:是使用简单的用户名 / 密码认证的机制,在 0.10 版本中被引入。
  • SCRAM:主要用于解决 PLAIN 机制安全问题的新机制,是在 0.10.2 版本中被引入的。
  • OAUTHBEARER:是基于 OAuth 2 认证框架的新机制,在 2.0 版本中被引进。
  • Delegation Token:补充现有 SASL 机制的轻量级认证机制,是在 1.1.0 版本被引入的。

认证机制的比较

目前来看,使用 SSL 做信道加密的情况更多一些,但使用 SSL 实现认证不如使用 SASL。毕竟,SASL 能够支持选择不同的实现机制,如 GSSAPI、SCRAM、PLAIN 等。因此,建议使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。

SASL/GSSAPI 主要是给 Kerberos 使用的。GSSAPI 适用于本身已经做了 Kerberos 认证的场景,这样的话,SASL/GSSAPI 可以实现无缝集成。

SASL/PLAIN 是一个简单的用户名 / 密码认证机制,通常与 SSL 加密搭配使用。对于一些小公司而言,搭建公司级的 Kerberos 可能并没有什么必要,他们的用户系统也不复杂,特别是访问 Kafka 集群的用户可能不是很多。对于 SASL/PLAIN 而言,这就是一个非常合适的应用场景。总体来说,SASL/PLAIN 的配置和运维成本相对较小,适合于小型公司中的 Kafka 集群。

SASL/PLAIN 有这样一个弊端:它不能动态地增减认证用户,必须重启 Kafka 集群才能令变更生效。因为所有认证用户信息全部保存在静态文件中,所以只能重启 Broker,才能重新加载变更后的静态文件。

SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。

SASL/OAUTHBEARER 是 2.0 版本引入的新认证机制,主要是为了实现与 OAuth 2 框架的集成。 Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全的 JSON Web Token,必须配以 SSL 加密才能用在生产环境中。

Delegation Token 是在 1.1 版本引入的,它是一种轻量级的认证机制,主要目的是补充现有的 SASL 或 SSL 认证。 如果要使用 Delegation Token,需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样,Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。

file

本案例以SASL中的PLAIN方式作为讲解内容

环境准备

file

首先选择海牛云平台的大数据组件中的kafka,加入实验环境,启动镜像

软件版本

jdk1.8.0_144 kafka_2.13-2.6.0 zookeeper-3.4.8

启动后开始本次 实验的配置

# 因为在实验启动的时候里面的zookeeper和kafka的组件已经启动,这个时候需要重新配置,那么首先先关闭集群
# 关闭命令,每个节点都要执行
kafka-server-stop.sh

file

集群配置

首先选择kafka的配置文件,增加如下配置

vim /opt/kafka_2.13-2.6.0/config/server.properties
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

每个机器都要增加这些配置,这个时候已经启动了kafka的认证配置

但是认证配置中需要增加用户名和密码

这个时候增加kafka的权限配置信息

vim /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf
# 增加如下配置
KafkaServer {
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="admin"
     password="123456"
     user_admin="123456"
     user_user2="123456";
 };
 #分发文件到不同的机器节点
scp /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf root@kafka2-24160:/opt/kafka_2.13-2.6.0/config/
scp /opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf root@kafka3-24160:/opt/kafka_2.13-2.6.0/config/

此配置定义了两个用户(admin和user2)。KafkaServer中的username、password配置的用户和密码,是用来broker和broker连接认证。在本例中,admin是代理broker间通信的用户。user_userName配置为连接到broker的所有用户定义密码,broker使用这些验证所有客户端连接,包括来自其他broker的连接。

请注意,username是服务端连接的账号,password为密码,其中下面的user_admin就是为了集群间通信时候使用的

将JAAS配置文件位置作为JVM参数传递给每个Kafka broker

在kafka-run-class.sh中增加如下配置:219行

file

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.13-2.6.0/config/kafka_server_jaas.conf"
# 增加配置后进行分发
scp /opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh root@kafka2-24160:/opt/kafka_2.13-2.6.0/bin/
scp /opt/kafka_2.13-2.6.0/bin/kafka-run-class.sh root@kafka3-24160:/opt/kafka_2.13-2.6.0/bin/

启动kafka进行启动集群

kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties 

file

这个时候集群启动完毕已经带有权限

客户端连接

file

集群已经启动权限,这个时候操作集群的命令是完全不生效的,我们需要配置连接的用户进行验证

使用kafka-topics.sh做topic的管理

首先配置权限认证信息 auth.conf

vim /root/auth.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="123456";

其中的用户名必须是admin 123456 或者是 user2 123456

kafka-topics.sh --bootstrap-server kafka1-24160:9092 --create --topic hainiu --partitions 3 --replication-factor 2 --command-config auth.conf

file

会存在警告信息,但是不会影响使用

消费这个生产信息

kafka-console-producer.sh --bootstrap-server kafka1-24160:9092 --topic hainiu --producer.config auth.conf
kafka-console-consumer.sh --bootstrap-server kafka1-24160:9092 --topic hainiu --consumer.config auth.conf

file

file

可以通过制定用户信息进行生产和消费数据

同时也可以修改配置文件信息直接使用生产者和消费者

vim producer.properties
vim consumer.properties
# 增加如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="123456";

权限配置

# 修改server.properties 增加如下配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin

使用权限配置并且超级用户是admin,这个用户和我们在上面配置的列表中的admin用户相互映射

每个机器都进行配置

重启所有的节点,这个时候已经开启了权限

# 配置用户认证文件信息
vim /root/admin.conf
# 输入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";

这个时候使用kafka的命令进行操作

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2

file

在没增加认证配置信息的时候没有任何反应

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu --partitions 3 --replication-factor 2 --command-config auth.conf

file

发现用户没有权限,这个时候加入admin.conf进行认证

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_hainiu1 --partitions 3 --replication-factor 2 --command-config admin.conf 

file

因为admin中的是超级用户

这个时候可以增加用户权限

#命令为 
kafka-acls.sh

参数如下:

  • --add 增加权限 --remove删除
  • --allow-host 允许操作的主机 --deny-host 拒绝的主机
  • --allow-principal 允许操作的用户 --deny-principal 拒绝用户
  • --bootstrap-server 集群地址
  • --command-config 认证文件信息
  • --group 可以消费topic的消费者组
  • --list 查询所有权限
  • --operation 增加操作权限
    • Describe
    • DescribeConfigs
    • Alter
    • Read
    • Delete
    • Create
    • All
    • Write
    • AlterConfigs
  • --topic 指定topic

查询topic的所有权限

/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --list --command-config admin.conf

删除所有权限

 /usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --topic topic_hainiu --remove --command-config admin.conf   

file

当前用户没有权限,现在给海牛用户赋值权限

kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --topic topic_hainiu --operation all --command-config admin.conf

file

kafka-topics.sh --bootstrap-server nn1:9092 --list --command-config auth.conf

file

回收权限

 kafka-acls.sh --bootstrap-server nn1:9092 --remove --allow-principal User:hainiu --topic topic_hainiu --operation all --command-config admin.conf 

file

kafka-acls.sh --bootstrap-server nn1:9092 --list --topic topic_hainiu  --command-config admin.conf 

这个时候没有他的访问权限

file

增加权限并且增加ip限制

 kafka-acls.sh --bootstrap-server nn1:9092 --add --deny-principal User:hainiu --topic topic_hainiu --operation all --deny-host 11.237.80.2 --command-config admin.conf

file

发现不可以访问

kafka-acls.sh --bootstrap-server nn1:9092 --remove --deny-principal User:hainiu --topic topic_hainiu --operation all --deny-host 11.237.80.2 --command-config admin.conf  

file

现在可以访问了

生产者和消费者的权限设置

# 删除所有权限信息
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server nn1:9092 --topic topic_hainiu --remove --command-config admin.conf
#生产者没有权限
kafka-console-producer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --producer.config auth.conf 
# 增加权限
kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Write --topic topic_hainiu --command-config admin.conf

file

生产者可以生产数据

现在进行消费数据

kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --from-beginning --consumer.config auth.conf   

file

现在进行权限分配

kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Read --topic topic_hainiu --command-config admin.conf

file

file

发现增加完毕权限还是不可以读取数据,因为消费者组没有增加权限

kafka-acls.sh --bootstrap-server nn1:9092 --add --allow-principal User:hainiu --operation Read --topic topic_hainiu --group hainiu --command-config admin.conf 

file

kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_hainiu --from-beginning --consumer.config auth.conf  --group hainiu

file

已经可以消费数据

代码客户端认证

首先增加resources中配置文件

kafka.jaas

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="hainiu"
    password="123456";
};

代码如下:

package com.hainiu.spark.kafka01

import java.util.Properties

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

/**
 * producer[interceptor serializer partitioner]
 * bootstrap-server
 * linger.ms
 * batch-size
 * topic
 * 生产者放入数据只能针对于同一个topic
 */
object TestProducer01 {
  def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.jaas")
    val pro = new Properties()
    pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092")
    pro.put(ProducerConfig.LINGER_MS_CONFIG,"100")
    pro.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
    pro.put(ProducerConfig.ACKS_CONFIG,"all")
    pro.put(ProducerConfig.RETRIES_CONFIG,"3")
    pro.put("security.protocol", "SASL_PLAINTEXT");               //2.协议需要配置
    pro.put("sasl.mechanism", "PLAIN");
    //新的版本中 0.11 3.3.2 幂等性
    pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
    pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
  pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)

    val producer = new KafkaProducer[String, String](pro)
    for(i<- 1 to 10){
      val record = new ProducerRecord[String,String]("topic_hainiu",s"world_${i}")
      producer.send(record)
    }
    producer.close()
  }
}

kafka的信道加密

首先选择集群并且修改每个机器的域名为

因为在认证中需要域名信息

vim /etc/hosts
nn1.hainiu.com
nn2.hainiu.com
s1.hainiu.com

选择nn1节点

mkdir -p /root/config/certificates

用于生成证书

创建KeyStore密钥库
Keytool是一个Java数据证书的管理工具 ,Keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中,Keystore可以简单理解为一个存放应用签名的文件。我们可以称它为秘钥库。在keystore里,包含两种数据:

密钥实体(Key entity)——密钥(secret key)又或者是私钥和配对公钥(采用非对称加密)
可信任的证书实体(trusted certificate entries)——只包含公钥

keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -validity 365 -genkey -keyalg RSA -storepass 123456 -keypass 123456 -dname "CN=*.hainiu.com, OU=kafka, O=kafka, L=shenzhen, ST=guangdong, C=CN"

CN域名
OU部门
O公司名
L城市
ST省份
C国家

查看KeyStore内容

keytool -list -v -keystore /root/config/certificates/kafka.keystore -storepass 123456

创建CA(Certificate Authority:认证机构)

CA是负责签发证书、认证证书、管理已颁发证书的机构,为了保证整个证书的安全性,所以需要使用CA进行证书的签名保证。

openssl req -new -x509 -keyout /root/config/certificates/ca-key -out /root/config/certificates/ca-cert -days 365 -passin pass:123456 -passout pass:123456 -subj "/C=CN/ST=guangdong/L=shenzhen/O=kafka/CN=*.hainiu.com"

将CA导入到TrustStore中
keystore 与 truststore 区别

keystore是存储密钥(公钥、私钥)的容器。
keystore和truststore其本质都是keystore。只不过二者存放的密钥所有者不同而已。本质都是相同的文件,只不过约定通过文件名称区分类型以及用途
对于keystore一般存储自己的私钥和公钥,而truststore则用来存储自己信任的对象的公钥

keytool -keystore "/root/config/certificates/kafka.truststore" -alias CARoot -import -file "/root/config/certificates/ca-cert" -storepass "123456" -keypass "123456" -noprompt

导出证书

keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -certreq -file /root/config/certificates/kafka-cert -storepass "123456" -keypass "123456"

给证书签名

openssl x509 -req -CA /root/config/certificates/ca-cert -CAkey /root/config/certificates/ca-key -in /root/config/certificates/kafka-cert -out /root/config/certificates/kafka-cert-signed -days 365 -CAcreateserial -passin pass:"123456"

导入CA到KeyStore

keytool -keystore /root/config/certificates/kafka.keystore -alias CARoot -import -file /root/config/certificates/ca-cert -storepass "123456" -keypass "12345" -noprompt

导入证书到KeyStore

keytool -keystore /root/config/certificates/kafka.keystore -alias kafka -import -file /root/config/certificates/kafka-cert-signed -storepass "123456" -keypass "123456"

复制配置文件

cp server.properties ssl.properties

vim ssl.properties 增加配置信息

listeners=SSL://nn1.hainiu.com:9092
advertised.listeners=SSL://nn1.hainiu.com:9092
ssl.keystore.location=/root/config/certificates/kafka.keystore
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/config/certificates/kafka.truststore
ssl.truststore.password=123456

ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=SSL

分发配置文件

scp /usr/local/kafka/config/ssl.properties root@nn2:/usr/local/kafka/config/
scp -r /root/config root@s1:/root/

客户端访问信息,需要增加如下配置

security.protocol=SSL
ssl.truststore.location=/root/config/certificates/kafka.truststore
ssl.truststore.password=123456
ssl.keystore.password=123456
ssl.keystore.location=/root/config/certificates/kafka.keystore

命令如下:

kafka-topics.sh --bootstrap-server nn1.hainiu.com:9092 --list --command-config client.conf 

代码链接

kafka.keystore
kafka.truststore复制到自己的本地

object TestProducer01 {
  def main(args: Array[String]): Unit = {
    val pro = new Properties()
    pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1.hainiu.com:9092")
    pro.put(ProducerConfig.LINGER_MS_CONFIG,"100")
    pro.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
    pro.put(ProducerConfig.ACKS_CONFIG,"all")
    pro.put(ProducerConfig.RETRIES_CONFIG,"3")
    pro.put("security.protocol","SSL")
    pro.put("ssl.truststore.location","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.truststore")
    pro.put(" ssl.truststore.password","123456")
    pro.put("ssl.keystore.password","123456")
    pro.put("ssl.keystore.location","G:\\workprogram\\spark52\\src\\main\\resources\\kafka.keystore")
    pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
    pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
    pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getName)
    //默认存在分区器  DefaultPartitioner
        val producer = new KafkaProducer[String, String](pro)
for(i<- 1 to 10){
  val record = new ProducerRecord[String,String]("topic_hainiu",s"world_${i}")
  producer.send(record,new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      //                      元数据对象,发送到哪了         没有错误代表成功
      //这里和sender线程没关系,主线程发送数据完毕会接受到ack,但是仅仅是接受
      if(exception == null){
        println(metadata.topic(),metadata.partition(),metadata.offset())
        println("成功了!!!")
      }else{
        println("失败了!!!")
      }
    }
  })
}
producer.close()
      }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-青牛,http://hainiubl.com/topics/76428
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter