1. 概述
认证是设计Kafka这类消息系统的核心环节。实现认证的方式包括用户凭证、SSL证书或基于令牌的机制等。
本教程将介绍如何在Kafka服务中实现简单认证和安全层(SASL)认证机制,并使用Spring Kafka实现客户端认证。
2. Kafka认证机制简介
Kafka支持多种认证授权机制保障网络安全,包括SSL、SASL或委托令牌。认证场景涵盖客户端到Broker、Broker到Zookeeper以及Broker间通信。
可根据系统需求和基础设施选择合适方案。SSL认证使用X.509证书验证客户端和Broker,支持单向或双向认证。
SASL是支持多种认证机制的安全框架:
- SASL/GSSAPI:基于通用安全服务API,可轻松集成现有Kerberos服务。通过密钥分发中心(KDC)提供网络认证,常用于Active Directory或Kerberos环境
- SASL/PLAIN:使用用户名密码认证,因网络传输不安全,仅适用于非生产环境
- SASL/SCRAM:通过加盐哈希密码创建挑战响应机制,支持SHA-256/512等算法,安全性高于明文
- SASL/OAUTHBEARER:使用OAuth 2.0承载令牌认证,适合Keycloak/OKTA等身份提供商场景
SASL可与SSL结合提供传输层加密。Kafka授权支持内置ACL机制、OAuth/OIDC或自定义授权器。
本教程聚焦广泛使用的GSSAPI认证实现。
3. 使用SASL/GSSAPI实现Kafka服务
假设需要在Docker环境中构建支持GSSAPI认证的Kafka服务,可利用Kerberos提供票据授予票据(TGT)服务作为认证服务器。
3.1 配置Kerberos
在Docker环境中实现Kerberos服务需自定义配置。首先创建krb5.conf配置域BAELDUNG.COM:
[libdefaults]
default_realm = BAELDUNG.COM
dns_lookup_realm = false
dns_lookup_kdc = false
forwardable = true
rdns = true
[realms]
BAELDUNG.COM = {
kdc = kdc
admin_server = kdc
}
域(realm)是Kafka服务的逻辑域名。需编写脚本初始化Kerberos数据库(kdb5_util),创建Kafka/Zookeeper/客户端的主体(principal)和密钥表文件(Kadmin.local),最后启动Krb5kdc和Kadmind服务。
实现kdc_setup.sh脚本添加主体并生成密钥表:
kadmin.local -q "addprinc -randkey kafka/[email protected]"
kadmin.local -q "addprinc -randkey zookeeper/[email protected]"
kadmin.local -q "addprinc -randkey [email protected]"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/kafka.keytab kafka/[email protected]"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/zookeeper.keytab zookeeper/[email protected]"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/client.keytab [email protected]"
krb5kdc
kadmind -nofork
⚠️ 主体格式通常为<服务名>/<主机名>@域,主机名可选,域名需大写。主体配置错误会导致认证失败**(服务名或FQDN不匹配)。
创建Dockerfile准备Kerberos环境:
FROM debian:bullseye
RUN apt-get update && \
apt-get install -y krb5-kdc krb5-admin-server krb5-user && \
rm -rf /var/lib/apt/lists/*
COPY config/krb5.conf /etc/krb5.conf
COPY setup_kdc.sh /setup_kdc.sh
RUN chmod +x /setup_kdc.sh
添加kadm5.acl文件授予管理员主体完全权限:
*/[email protected] *
3.2 Kafka和Zookeeper配置
使用*JAAS*配置Kafka/Zookeeper与Kerberos的KDC认证。为Kafka服务器和Zookeeper分别创建JAAS配置文件。
zookeeper_jaas.conf配置:
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/zookeeper.keytab"
principal="zookeeper/[email protected]";
};
主体需与Zookeeper的Kerberos主体一致,useKeyTab=true强制使用密钥表认证。
kafka_server_jaas.conf配置Kafka服务器和客户端:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/kafka.keytab"
principal="kafka/[email protected]"
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/client.keytab"
principal="[email protected]"
serviceName="kafka";
};
3.3 集成Kafka、Zookeeper和Kerberos
通过Docker服务轻松集成三者。首先实现Kerberos服务:
services:
kdc:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./config:/etc/krb5kdc
- ./keytabs:/etc/krb5kdc/keytabs
- ./config/krb5.conf:/etc/krb5.conf
ports:
- "88:88/udp"
服务通过UDP 88端口暴露。配置Zookeeper服务:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
volumes:
- ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
- ./keytabs:/etc/kafka/keytabs
- ./config/krb5.conf:/etc/krb5.conf
ports:
- "2181:2181"
最后配置Kafka服务:
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
- ./keytabs:/etc/kafka/keytabs
- ./config/krb5.conf:/etc/krb5.conf
depends_on:
- zookeeper
- kdc
ports:
- 9092:9092
✅ 关键点:
- KAFKA_ADVERTISED_LISTENERS是客户端监听的接口
- 已启用Broker间和客户端到Broker的GSSAPI认证
- 使用kafka_server_jaas.conf配置主体和密钥表
启动Docker环境:
$ docker compose up --build
验证日志:
kafka | [2025-02-03 18:09:10,147] INFO Successfully authenticated client: authenticationID=kafka/[email protected]; authorizationID=kafka/[email protected]. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
kafka | [2025-02-03 18:09:10,148] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to localhost:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
确认服务集成成功。
4. 使用Spring实现Kafka客户端
基于Spring Kafka实现Kafka监听器应用。
4.1 Maven依赖
添加spring-kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
4.2 实现Kafka监听器
使用Spring Kafka的KafkaListener和ConsumerRecord实现监听:
@KafkaListener(topics = "test-topic")
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
在application-sasl.yml配置监听参数:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
运行应用时出现认证失败日志:
kafka | [2025-02-01 03:08:01,532] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /172.21.0.1 (channelId=172.21.0.4:9092-172.21.0.1:59840-16) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
❌ 问题:客户端未配置SASL认证导致认证失败。需在Spring应用中添加JAAS配置。
5. 使用JAAS配置Kafka客户端
通过spring.kafka.properties配置SASL/GSSAPI参数。添加主体、密钥表和认证机制配置:
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
sasl.mechanism: GSSAPI
sasl.jaas.config: >
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="./src/test/resources/sasl/keytabs/client.keytab"
principal="[email protected]"
serviceName="kafka";
⚠️ 注意:serviceName必须与Kafka主体的服务名完全匹配。
重新启动应用验证配置。
6. 测试Kafka监听器
使用kafka-console-producer.sh工具快速测试监听器:
$ kafka-console-producer.sh --broker-list localhost:9092 \
--topic test-topic \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=GSSAPI \
--producer-property sasl.kerberos.service.name=kafka \
--producer-property sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true keyTab=\"/<path>/client.keytab\"
storeKey=true principal=\"[email protected]\";"
> hello
命令中传递了认证相关配置(安全协议、SASL机制、JAAS配置)。监听器日志显示成功接收消息:
08:52:13.663 INFO c.b.s.KafkaConsumer - Received payload: 'ConsumerRecord(topic = test-topic, .... key = null, value = hello)'
✅ 生产环境建议:需额外配置SSL证书和DNS等安全措施。
7. 总结
本文介绍了在Docker环境中:
- 使用自定义Kerberos搭建Kafka服务
- 启用SASL/GSSAPI认证
- 通过Spring Kafka实现客户端认证
- 使用JAAS配置完成认证集成
通过消息收发测试验证了整个认证流程。生产环境中建议结合SSL证书增强安全性。