1. 引言

本教程将介绍如何通过 SSL 认证将 Spring Boot 客户端连接到 Apache Kafka 代理的基础配置。

历史背景:SSL(安全套接层)实际上自 2015 年起已被 TLS(传输层安全)取代。但出于历史原因,Kafka(和 Java)仍沿用“SSL”术语,本文也将遵循此惯例。

2. SSL 核心机制

默认情况下,Apache Kafka 以明文传输数据且无任何认证机制。

关键点

  • 单向认证:客户端通过公钥加密验证服务器证书
  • 双向认证(mTLS):服务器和客户端互相验证证书
  • 核心组件
    • 密钥库(Keystore):存储私钥和公钥证书
    • 信任库(Truststore):存储受信任的证书

认证流程

  1. 每个代理需独立的密钥库(含私钥和公钥证书)
  2. 客户端通过信任库验证代理证书
  3. 每个客户端需独立的密钥库(含私钥和公钥证书)
  4. 代理通过信任库验证客户端证书

简化方案:信任库可包含 CA(证书颁发机构),这样只需信任 CA 签名的所有证书,无需频繁更新信任库。

3. 依赖与环境搭建

3.1 项目依赖

pom.xml 添加 Spring Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.7</version>
</dependency>

3.2 Docker 环境初始化

使用 Docker Compose 启动基础 Kafka 环境(无 SSL):

version: '3'
services:
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

启动命令:

docker-compose up -d

4. Kafka 代理配置

4.1 独立代理配置(参考)

server.properties 中添加核心配置:

# 启用 SSL 监听端口
listeners=SSL://:9093
advertised.listeners=SSL://localhost:9093

# 密钥库配置
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=server-password
ssl.key.password=server-password

# 信任库配置
ssl.truststore.location=/path/to/kafka.server.truststore.jks
ssl.truststore.password=server-password

# 强制客户端认证
ssl.client.auth=required

4.2 Docker Compose 配置

更新 docker-compose.yml 启用 SSL:

services:
  kafka:
    # ...其他配置...
    ports:
      - "9093:9093"
    volumes:
      - ./certs:/etc/kafka/certs
    environment:
      # ...基础配置...
      KAFKA_LISTENERS: SSL://:9093
      KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
      KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/certs/kafka.server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: server-password
      KAFKA_SSL_KEY_PASSWORD: server-password
      KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/certs/kafka.server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: server-password
      KAFKA_SSL_CLIENT_AUTH: required

重启后验证日志:

docker logs kafka | grep SSL

应看到类似输出:

[SocketServer brokerId=1] Created SSL processor with SSL configs...

5. Spring Boot 客户端实现

5.1 生产者配置

使用 KafkaTemplate 发送消息:

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, message);
        
        future.addCallback(
            result -> System.out.println("消息发送成功: " + result.getRecordMetadata()),
            ex -> System.err.println("发送失败: " + ex.getMessage())
        );
    }
}

5.2 消费者配置

使用 @KafkaListener 消费消息:

@Service
public class KafkaConsumer {

    private final List<String> messages = new ArrayList<>();

    @KafkaListener(topics = "ssl-topic", groupId = "ssl-group")
    public void listen(String message) {
        messages.add(message);
        System.out.println("接收到消息: " + message);
    }

    public List<String> getMessages() {
        return messages;
    }
}

5.3 SSL 配置(application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9093
    properties:
      security.protocol: SSL
      ssl.truststore.location: classpath:kafka.client.truststore.jks
      ssl.truststore.password: client-password
      ssl.keystore.location: classpath:kafka.client.keystore.jks
      ssl.keystore.password: client-password
      ssl.key.password: client-password

关键配置说明

  • ssl.truststore.location:验证 Kafka 代理的证书(含 CA 签名)
  • ssl.keystore.location:客户端证书(需被代理信任库中的 CA 签名)

5.4 集成测试

使用 Testcontainers 验证端到端通信:

@SpringBootTest
@Testcontainers
public class KafkaSslIntegrationTest {

    @Container
    public static DockerComposeContainer<?> kafka = 
        new DockerComposeContainer<>(new File("docker-compose.yml"))
            .withExposedService("kafka", 9093);

    @Autowired
    private KafkaProducer producer;
    
    @Autowired
    private KafkaConsumer consumer;

    @Test
    public void testSslConnection() throws Exception {
        producer.sendMessage("ssl-topic", "测试消息");
        
        Awaitility.await()
            .atMost(10, TimeUnit.SECONDS)
            .until(() -> !consumer.getMessages().isEmpty());
        
        assertEquals("测试消息", consumer.getMessages().get(0));
    }
}

6. 总结

本文完整实现了 Kafka 与 Spring Boot 的双向 SSL 认证:

  1. 代理端:通过密钥库/信任库强制客户端认证
  2. 客户端:配置 SSL 参数建立安全连接
  3. 验证:集成测试确保双向认证生效

踩坑提醒:证书链配置错误是常见问题,务必确保:

  • 客户端证书被代理信任库中的 CA 签名
  • 代理证书被客户端信任库中的 CA 签名

完整代码见 GitHub 仓库


原始标题:Configuring Kafka SSL Using Spring Boot