1. 概述

本文将指导你在 Kafka 服务中实现 SASL/PLAIN 认证机制,并利用 Spring Kafka 提供的支持实现客户端认证。

Kafka 支持多种认证方案,包括 SASL、SSL 和委托令牌认证,以增强安全性和兼容性。

SASL(简单认证与安全层)是一个认证框架,允许轻松集成其他认证机制,如 GSSAPI、OAuthBearer、SCRAM 和 PLAIN。

⚠️ SASL/PLAIN 认证并不安全! 因为用户凭据会以明文形式在网络中传输。不过,由于配置简单,它仍适用于本地开发环境。

注意:除非与 SSL/TLS 结合使用,否则生产环境不应采用 SASL/PLAIN 认证当 SSL 与 SASL/PLAIN 认证结合使用时(Kafka 中称为 SASL-SSL),它会对客户端和服务器间的流量(包括敏感凭据)进行加密

2. 在 Kafka 中实现 SASL/PLAIN 认证

假设我们需要在 Docker 环境中构建一个支持 SASL/PLAIN 认证的 Kafka 服务。我们将利用 JAAS 配置添加 SASL/PLAIN 所需的用户凭据。

2.1. 配置 Kafka 凭据

在 Kafka 中配置用户凭据时,我们将使用 PlainLoginModule 安全实现。

创建 kafka_server_jaas.conf 文件配置 adminuser1 凭据:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_user1="user1-secret";
};

上述代码定义了 adminuser1 用户,分别用于 Kafka 内部代理通信和外部客户端认证。user1 遵循 user_<username> 格式定义。

2.2. 配置 Zookeeper 凭据

由于 Kafka 服务已包含客户端用户凭据,我们还需要用 SASL/PLAIN 认证保护 Zookeeper 服务。这是良好的安全实践。

创建 zookeeper_jaas.conf 文件配置 zookeeper 用户凭据:

Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zookeeper"
    password="zookeeper-secret"
    user_zookeeper="zookeeper-secret";
};

这里使用 Zookeeper 专用的 DigestLoginModule 安全实现(而非 Kafka 的 PlainLoginModule)以提升兼容性。

此外,在之前创建的 kafka_server_jaas.conf 文件中添加 zookeeper 凭据:

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zookeeper"
    password="zookeeper-secret";
};

上述 Client 凭据供 Kafka 服务向 Zookeeper 服务认证时使用。

2.3. 搭建 Kafka 与 Zookeeper 服务

使用 Docker Compose 文件搭建 Kafka 和 Zookeeper 服务。

首先实现 Zookeeper 服务并引入 zookeeper_jaas.conf 文件:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.6
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
    volumes:
      - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
    ports:
      - 2181

接着实现支持 SASL/PLAIN 认证的 Kafka 服务:

kafka:
  image: confluentinc/cp-kafka:7.6.6
  depends_on:
    - zookeeper
  environment:
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
    KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
    KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
    KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
    KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
    KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  volumes:
    - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
  ports:
    - "9092:9092"

上述配置引入了之前创建的 kafka_server_jaas.conf 文件设置 SASL/PLAIN 用户。

关键点KAFKA_ADVERTISED_LISTENERS 属性是 Kafka 客户端发送消息和监听的接口地址。

最后使用 docker compose 命令启动整个环境:

docker compose up --build

Docker 控制台将输出类似日志:

kafka-1      | [2025-06-19 14:32:00,441] INFO Session establishment complete on server zookeeper/172.18.0.2:2181, session id = 0x10000004c150001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
kafka-1      | [2025-06-19 14:32:00,445] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
zookeeper-1  | [2025-06-19 14:32:00,461] INFO Successfully authenticated client: authenticationID=zookeeper;  authorizationID=zookeeper. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)

确认 Kafka 和 Zookeeper 服务已无错误集成。

3. 使用 Spring 实现 Kafka 客户端

我们将使用 Spring Kafka 实现生产者和消费者服务。

3.1. Maven 依赖

首先添加 Spring Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

3.2. Kafka 生产者

使用 KafkaTemplate 类实现生产者服务:

public void sendMessage(String message, String topic) {
    LOGGER.info("Producing message: {}", message);
    kafkaTemplate.send(topic, "key", message)
        .whenComplete((result, ex) -> {
            if (ex == null) {
                LOGGER.info("Message sent to topic: {}", message);
            } else {
                LOGGER.error("Failed to send message", ex);
            }
        });
}

上述代码通过 KafkaTemplatesend 方法发送消息。

3.3. Kafka 消费者

使用 Spring Kafka 的 @KafkaListenerConsumerRecord 实现消费者服务:

@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
    LOGGER.info("Received payload: '{}'", consumerRecord.toString());
    messages.add(consumerRecord.value());
}

上述代码接收消息并将其添加到 messages 列表。

3.4. 配置 Spring 应用连接 Kafka

创建 application.yml 文件并添加 Spring Kafka 相关配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

启动应用验证配置:

kafka-1 | [2025-06-19 14:38:33,188] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /192.168.65.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

如预期,客户端应用无法通过 Kafka 服务器认证。

3.5. 配置客户端 JAAS 认证

通过 spring.kafka.properties 配置解决上述错误,添加 SASL/PLAIN 设置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      sasl.mechanism: PLAIN
      sasl.jaas.config: >
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="user1"
        password="user1-secret";
    security:
      protocol: "SASL_PLAINTEXT"

上述配置在 sasl.jaas.config 中添加了匹配的 usernamepassword

踩坑提醒:SASL 配置错误会导致常见问题:

  • sasl.mechanism 误设为 PLAINTEXT(而非 PLAIN):
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
        ... 25 common frames omitted
    Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAINTEXT
    
  • sasl.mechanism 误写为 security.mechanism
    Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    

验证完整配置的 Kafka 应用。

4. 测试

使用 Testcontainers 框架测试 Kafka 客户端应用。

首先基于 docker-compose.yml 创建 DockerComposeContainer 对象:

@Container
public DockerComposeContainer<?> container =
  new DockerComposeContainer<>("src/test/resources/sasl-plaintext/docker-compose.yml")
    .withExposedService("kafka", "9092", Wait.forListeningPort());

实现测试方法验证消费者:

@Test
void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
    String message = UUID.randomUUID().toString();
    kafkaProducer.sendMessage(message, "test-topic");

    await().atMost(Duration.ofMinutes(2))
      .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}

运行测试用例验证输出:

16:56:44.525 [kafka-producer-network-thread | producer-1] INFO c.b.saslplaintext.KafkaProducer - Message sent to topic: 82e8a804-0269-40a2-b8ed-c509e6951011
16:56:48.566 INFO  c.b.saslplaintext.KafkaConsumer - Received payload: ConsumerRecord(topic = test-topic, ... key = key, value = 82e8a804-0269-40a2-b8ed-c509e6951011

日志显示消费者服务成功接收消息。

5. 总结

本教程介绍了在 Docker 环境中通过 JAAS 配置为 Kafka 服务设置 SASL/PLAIN 认证的方法。

我们还实现了生产者/消费者服务,并通过类似的 JAAS 配置进行认证。最后使用 Docker TestContainer 发送/接收消息测试了整个环境。

示例代码可在 GitHub 获取。


原始标题:Secure Kafka With SASL/PLAIN Authentication | Baeldung