1. 概述

认证是设计Kafka这类消息系统的核心环节。实现认证的方式包括用户凭证、SSL证书或基于令牌的机制等。

本教程将介绍如何在Kafka服务中实现简单认证和安全层(SASL)认证机制,并使用Spring Kafka实现客户端认证。

2. Kafka认证机制简介

Kafka支持多种认证授权机制保障网络安全,包括SSL、SASL或委托令牌。认证场景涵盖客户端到Broker、Broker到Zookeeper以及Broker间通信。

可根据系统需求和基础设施选择合适方案。SSL认证使用X.509证书验证客户端和Broker,支持单向或双向认证。

SASL是支持多种认证机制的安全框架

  • SASL/GSSAPI:基于通用安全服务API,可轻松集成现有Kerberos服务。通过密钥分发中心(KDC)提供网络认证,常用于Active Directory或Kerberos环境
  • SASL/PLAIN:使用用户名密码认证,因网络传输不安全,仅适用于非生产环境
  • SASL/SCRAM:通过加盐哈希密码创建挑战响应机制,支持SHA-256/512等算法,安全性高于明文
  • SASL/OAUTHBEARER:使用OAuth 2.0承载令牌认证,适合Keycloak/OKTA等身份提供商场景

SASL可与SSL结合提供传输层加密。Kafka授权支持内置ACL机制、OAuth/OIDC或自定义授权器

本教程聚焦广泛使用的GSSAPI认证实现。

3. 使用SASL/GSSAPI实现Kafka服务

假设需要在Docker环境中构建支持GSSAPI认证的Kafka服务,可利用Kerberos提供票据授予票据(TGT)服务作为认证服务器。

3.1 配置Kerberos

在Docker环境中实现Kerberos服务需自定义配置。首先创建krb5.conf配置域BAELDUNG.COM

[libdefaults]
  default_realm = BAELDUNG.COM
  dns_lookup_realm = false
  dns_lookup_kdc = false
  forwardable = true
  rdns = true

[realms]
  BAELDUNG.COM = {
    kdc = kdc
    admin_server = kdc
  }

域(realm)是Kafka服务的逻辑域名。需编写脚本初始化Kerberos数据库(kdb5_util),创建Kafka/Zookeeper/客户端的主体(principal)和密钥表文件(Kadmin.local),最后启动Krb5kdcKadmind服务。

实现kdc_setup.sh脚本添加主体并生成密钥表:

kadmin.local -q "addprinc -randkey kafka/[email protected]"
kadmin.local -q "addprinc -randkey zookeeper/[email protected]"
kadmin.local -q "addprinc -randkey [email protected]"

kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/kafka.keytab kafka/[email protected]"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/zookeeper.keytab zookeeper/[email protected]"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/client.keytab [email protected]"

krb5kdc
kadmind -nofork

⚠️ 主体格式通常为<服务名>/<主机名>@域,主机名可选,域名需大写。主体配置错误会导致认证失败**(服务名或FQDN不匹配)。

创建Dockerfile准备Kerberos环境:

FROM debian:bullseye

RUN apt-get update && \
    apt-get install -y krb5-kdc krb5-admin-server krb5-user && \
    rm -rf /var/lib/apt/lists/*
COPY config/krb5.conf /etc/krb5.conf
COPY setup_kdc.sh /setup_kdc.sh

RUN chmod +x /setup_kdc.sh

添加kadm5.acl文件授予管理员主体完全权限:

*/[email protected] *

3.2 Kafka和Zookeeper配置

使用*JAAS*配置Kafka/Zookeeper与Kerberos的KDC认证。为Kafka服务器和Zookeeper分别创建JAAS配置文件。

zookeeper_jaas.conf配置:

Server {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/kafka/keytabs/zookeeper.keytab"
    principal="zookeeper/[email protected]";
};

主体需与Zookeeper的Kerberos主体一致,useKeyTab=true强制使用密钥表认证。

kafka_server_jaas.conf配置Kafka服务器和客户端:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/kafka/keytabs/kafka.keytab"
    principal="kafka/[email protected]"
    serviceName="kafka";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/kafka/keytabs/client.keytab"
    principal="[email protected]"
    serviceName="kafka";
};

3.3 集成Kafka、Zookeeper和Kerberos

通过Docker服务轻松集成三者。首先实现Kerberos服务:

services:
  kdc:
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - ./config:/etc/krb5kdc
      - ./keytabs:/etc/krb5kdc/keytabs
      - ./config/krb5.conf:/etc/krb5.conf
    ports:
      - "88:88/udp"

服务通过UDP 88端口暴露。配置Zookeeper服务:

zookeeper:
  image: confluentinc/cp-zookeeper:latest
  container_name: zookeeper
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
    KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
  volumes:
    - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
    - ./keytabs:/etc/kafka/keytabs
    - ./config/krb5.conf:/etc/krb5.conf
  ports:
    - "2181:2181"

最后配置Kafka服务:

kafka:
  image: confluentinc/cp-kafka:latest
  container_name: kafka
  environment:
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
    KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
    KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
    KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
    KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
    KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  volumes:
    - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
    - ./keytabs:/etc/kafka/keytabs
    - ./config/krb5.conf:/etc/krb5.conf
  depends_on:
    - zookeeper
    - kdc
  ports:
    - 9092:9092

关键点

  • KAFKA_ADVERTISED_LISTENERS是客户端监听的接口
  • 已启用Broker间和客户端到Broker的GSSAPI认证
  • 使用kafka_server_jaas.conf配置主体和密钥表

启动Docker环境:

$ docker compose up --build

验证日志:

kafka      | [2025-02-03 18:09:10,147] INFO Successfully authenticated client: authenticationID=kafka/[email protected]; authorizationID=kafka/[email protected]. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
kafka      | [2025-02-03 18:09:10,148] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to localhost:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)

确认服务集成成功。

4. 使用Spring实现Kafka客户端

基于Spring Kafka实现Kafka监听器应用。

4.1 Maven依赖

添加spring-kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

4.2 实现Kafka监听器

使用Spring Kafka的KafkaListenerConsumerRecord实现监听:

@KafkaListener(topics = "test-topic")
public void receive(ConsumerRecord<String, String> consumerRecord) {
    log.info("Received payload: '{}'", consumerRecord.toString());
    messages.add(consumerRecord.value());
}

application-sasl.yml配置监听参数:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

运行应用时出现认证失败日志:

kafka | [2025-02-01 03:08:01,532] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /172.21.0.1 (channelId=172.21.0.4:9092-172.21.0.1:59840-16) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

问题:客户端未配置SASL认证导致认证失败。需在Spring应用中添加JAAS配置。

5. 使用JAAS配置Kafka客户端

通过spring.kafka.properties配置SASL/GSSAPI参数。添加主体、密钥表和认证机制配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      sasl.mechanism: GSSAPI
      sasl.jaas.config: >
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="./src/test/resources/sasl/keytabs/client.keytab"
        principal="[email protected]"
        serviceName="kafka";

⚠️ 注意serviceName必须与Kafka主体的服务名完全匹配。

重新启动应用验证配置。

6. 测试Kafka监听器

使用kafka-console-producer.sh工具快速测试监听器:

$ kafka-console-producer.sh --broker-list localhost:9092 \
  --topic test-topic \
  --producer-property security.protocol=SASL_PLAINTEXT \
  --producer-property sasl.mechanism=GSSAPI \
  --producer-property sasl.kerberos.service.name=kafka \
  --producer-property sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required 
    useKeyTab=true keyTab=\"/<path>/client.keytab\" 
    storeKey=true principal=\"[email protected]\";"
> hello

命令中传递了认证相关配置(安全协议、SASL机制、JAAS配置)。监听器日志显示成功接收消息:

08:52:13.663 INFO  c.b.s.KafkaConsumer - Received payload: 'ConsumerRecord(topic = test-topic, .... key = null, value = hello)'

生产环境建议:需额外配置SSL证书和DNS等安全措施。

7. 总结

本文介绍了在Docker环境中:

  1. 使用自定义Kerberos搭建Kafka服务
  2. 启用SASL/GSSAPI认证
  3. 通过Spring Kafka实现客户端认证
  4. 使用JAAS配置完成认证集成

通过消息收发测试验证了整个认证流程。生产环境中建议结合SSL证书增强安全性。


原始标题:Implement SASL Authentication in Kafka With JAAS Config | Baeldung