1. 概述
本文将指导你在 Kafka 服务中实现 SASL/PLAIN 认证机制,并利用 Spring Kafka 提供的支持实现客户端认证。
Kafka 支持多种认证方案,包括 SASL、SSL 和委托令牌认证,以增强安全性和兼容性。
SASL(简单认证与安全层)是一个认证框架,允许轻松集成其他认证机制,如 GSSAPI、OAuthBearer、SCRAM 和 PLAIN。
⚠️ SASL/PLAIN 认证并不安全! 因为用户凭据会以明文形式在网络中传输。不过,由于配置简单,它仍适用于本地开发环境。
注意:除非与 SSL/TLS 结合使用,否则生产环境不应采用 SASL/PLAIN 认证。当 SSL 与 SASL/PLAIN 认证结合使用时(Kafka 中称为 SASL-SSL),它会对客户端和服务器间的流量(包括敏感凭据)进行加密。
2. 在 Kafka 中实现 SASL/PLAIN 认证
假设我们需要在 Docker 环境中构建一个支持 SASL/PLAIN 认证的 Kafka 服务。我们将利用 JAAS 配置添加 SASL/PLAIN 所需的用户凭据。
2.1. 配置 Kafka 凭据
在 Kafka 中配置用户凭据时,我们将使用 PlainLoginModule 安全实现。
创建 kafka_server_jaas.conf 文件配置 admin 和 user1 凭据:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_user1="user1-secret";
};
上述代码定义了 admin 和 user1 用户,分别用于 Kafka 内部代理通信和外部客户端认证。user1 遵循 user_<username>
格式定义。
2.2. 配置 Zookeeper 凭据
由于 Kafka 服务已包含客户端用户凭据,我们还需要用 SASL/PLAIN 认证保护 Zookeeper 服务。这是良好的安全实践。
创建 zookeeper_jaas.conf 文件配置 zookeeper 用户凭据:
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeeper-secret"
user_zookeeper="zookeeper-secret";
};
这里使用 Zookeeper 专用的 DigestLoginModule 安全实现(而非 Kafka 的 PlainLoginModule)以提升兼容性。
此外,在之前创建的 kafka_server_jaas.conf 文件中添加 zookeeper 凭据:
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zookeeper"
password="zookeeper-secret";
};
上述 Client 凭据供 Kafka 服务向 Zookeeper 服务认证时使用。
2.3. 搭建 Kafka 与 Zookeeper 服务
使用 Docker Compose 文件搭建 Kafka 和 Zookeeper 服务。
首先实现 Zookeeper 服务并引入 zookeeper_jaas.conf 文件:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.6
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
volumes:
- ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
ports:
- 2181
接着实现支持 SASL/PLAIN 认证的 Kafka 服务:
kafka:
image: confluentinc/cp-kafka:7.6.6
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
ports:
- "9092:9092"
上述配置引入了之前创建的 kafka_server_jaas.conf 文件设置 SASL/PLAIN 用户。
✅ 关键点:KAFKA_ADVERTISED_LISTENERS 属性是 Kafka 客户端发送消息和监听的接口地址。
最后使用 docker compose
命令启动整个环境:
docker compose up --build
Docker 控制台将输出类似日志:
kafka-1 | [2025-06-19 14:32:00,441] INFO Session establishment complete on server zookeeper/172.18.0.2:2181, session id = 0x10000004c150001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
kafka-1 | [2025-06-19 14:32:00,445] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
zookeeper-1 | [2025-06-19 14:32:00,461] INFO Successfully authenticated client: authenticationID=zookeeper; authorizationID=zookeeper. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
确认 Kafka 和 Zookeeper 服务已无错误集成。
3. 使用 Spring 实现 Kafka 客户端
我们将使用 Spring Kafka 实现生产者和消费者服务。
3.1. Maven 依赖
首先添加 Spring Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
3.2. Kafka 生产者
使用 KafkaTemplate 类实现生产者服务:
public void sendMessage(String message, String topic) {
LOGGER.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.info("Message sent to topic: {}", message);
} else {
LOGGER.error("Failed to send message", ex);
}
});
}
上述代码通过 KafkaTemplate 的 send 方法发送消息。
3.3. Kafka 消费者
使用 Spring Kafka 的 @KafkaListener 和 ConsumerRecord 实现消费者服务:
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
上述代码接收消息并将其添加到 messages 列表。
3.4. 配置 Spring 应用连接 Kafka
创建 application.yml 文件并添加 Spring Kafka 相关配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
启动应用验证配置:
kafka-1 | [2025-06-19 14:38:33,188] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /192.168.65.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
如预期,客户端应用无法通过 Kafka 服务器认证。
3.5. 配置客户端 JAAS 认证
通过 spring.kafka.properties 配置解决上述错误,添加 SASL/PLAIN 设置:
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="user1-secret";
security:
protocol: "SASL_PLAINTEXT"
上述配置在 sasl.jaas.config 中添加了匹配的 username 和 password。
❌ 踩坑提醒:SASL 配置错误会导致常见问题:
- 若 sasl.mechanism 误设为 PLAINTEXT(而非 PLAIN):
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator ... 25 common frames omitted Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAINTEXT
- 若 sasl.mechanism 误写为 security.mechanism:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
验证完整配置的 Kafka 应用。
4. 测试
使用 Testcontainers 框架测试 Kafka 客户端应用。
首先基于 docker-compose.yml 创建 DockerComposeContainer 对象:
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>("src/test/resources/sasl-plaintext/docker-compose.yml")
.withExposedService("kafka", "9092", Wait.forListeningPort());
实现测试方法验证消费者:
@Test
void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
String message = UUID.randomUUID().toString();
kafkaProducer.sendMessage(message, "test-topic");
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
运行测试用例验证输出:
16:56:44.525 [kafka-producer-network-thread | producer-1] INFO c.b.saslplaintext.KafkaProducer - Message sent to topic: 82e8a804-0269-40a2-b8ed-c509e6951011
16:56:48.566 INFO c.b.saslplaintext.KafkaConsumer - Received payload: ConsumerRecord(topic = test-topic, ... key = key, value = 82e8a804-0269-40a2-b8ed-c509e6951011
日志显示消费者服务成功接收消息。
5. 总结
本教程介绍了在 Docker 环境中通过 JAAS 配置为 Kafka 服务设置 SASL/PLAIN 认证的方法。
我们还实现了生产者/消费者服务,并通过类似的 JAAS 配置进行认证。最后使用 Docker TestContainer 发送/接收消息测试了整个环境。
示例代码可在 GitHub 获取。