1. 简介

Apache Kafka 是一个分布式消息平台,可以实现多个系统之间的高吞吐数据交换。

Spring Cloud Stream 是一个用于构建消息驱动微服务的框架。✅ 它大大简化了 Kafka 的集成工作,让开发者更专注于业务逻辑而非底层通信细节。

在实际开发中,Kafka 通常结合 Avro 消息格式和 Schema Registry 使用。本文将以 Confluent Schema Registry 为例,演示如何使用 Spring Cloud Stream 与 Kafka 集成,同时支持 Avro 格式的消息序列化与反序列化。我们还会对比 Spring 自带的实现与 Confluent 原生库的差异。


2. Confluent Schema Registry

Kafka 本质上只处理字节流(byte stream),所以通常需要一个外部的 Schema 来定义消息格式,并通过序列化/反序列化实现对象与字节之间的转换。

为了避免每次消息都携带完整的 Schema,Schema Registry 的作用就是集中管理 Schema,并通过 Schema ID 来标识每个 Schema。生产者在发送消息前先注册 Schema,消费者根据消息头中的 Schema ID 获取对应的 Schema 进行反序列化。

Confluent Schema Registry 提供了一套 REST API 来管理 Schema,支持 Schema 版本控制和兼容性检查。

  • Schema 存储方式:按“主题(Subject)”分类存储
  • 兼容性检查:上传新 Schema 时,默认会检查与旧版本是否兼容(如向后兼容)
  • Schema ID 的作用:用于标识消息所使用的 Schema,避免重复传输

生产者发送消息时,会从 Schema Registry 获取 Schema ID;消费者消费消息时,也会根据 Schema ID 获取对应的 Schema,从而实现结构化数据的解析。


3. Apache Avro 简介

Apache Avro 是一个高效的数据序列化系统。它使用 JSON 格式定义 Schema,支持多种语言生成数据模型类(如 Java POJO),便于序列化和反序列化。

Avro 的优势在于:

✅ 支持 Schema 的演化(Schema Evolution)
✅ 支持跨版本兼容(如新增字段不影响旧消费者)
✅ 二进制编码效率高,比 JSON 更紧凑

通过 Avro 提供的工具,我们可以根据 Schema 文件(如 .avsc)自动生成 Java 类,从而简化开发流程。


4. 项目依赖配置

使用 Spring Cloud Stream + Kafka + Avro + Schema Registry,需要引入以下依赖:

Maven 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>

仓库配置

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Avro 代码生成插件

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

启动本地 Kafka 与 Schema Registry

建议使用 Docker 快速搭建测试环境,例如:

docker-compose up -d

5. Spring Cloud Stream 实现消息生产与消费

5.1. 定义 Avro Schema

src/main/resources 下创建 employee-schema.avsc 文件:

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "firstName", "type": "string"},
        {"name": "lastName", "type": "string"}
    ]
}

构建项目后,Maven 插件会自动生成 com.baeldung.schema.Employee 类。

5.2. 消息生产者

使用 Spring Cloud Stream 的 Processor 接口发送消息:

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {
    Employee employee = new Employee(empId, firstName, lastName);
    Message<Employee> message = MessageBuilder.withPayload(employee).build();
    processor.output().send(message);
}

5.3. 消息消费者

监听 Kafka 消息并打印日志:

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("处理员工信息: {}", employeeDetails);
}

5.4. Kafka 绑定配置

application.yml 中配置 Kafka 绑定:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

⚠️ 注意:这里的 destination 表示 Kafka Topic 名称。

5.5. 启动类配置

启用 Schema Registry 支持:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }
}

5.6. Schema Registry Client Bean

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

5.7. 测试接口

添加一个 REST 接口触发消息发送:

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "消息已发送至消费者";
}

5.8. 消息处理流程

  1. 生产者构建 Employee 对象
  2. 注册 Schema 到 Schema Registry,获取 Schema ID
  3. 使用 Avro 序列化对象为字节流
  4. Schema ID 放入消息 Header
  5. 发送消息到 Kafka
  6. 消费者从 Header 中读取 Schema ID
  7. 通过 Schema Registry 获取 Schema
  8. 反序列化字节流为本地对象

6. 使用 Confluent 原生序列化/反序列化

Spring Boot 默认使用 AvroSchemaMessageConverter,但在某些场景下(如跨语言、分区、兼容性)推荐使用 Confluent 原生的 KafkaAvroSerializerKafkaAvroDeserializer

更新 application.yml 配置:

spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      kafka:
        binder:
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:8081
          consumer-properties:
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
            specific.avro.reader: true

useNativeEncoding 会启用原生序列化,提升性能并增强与 Confluent 生态的互操作性。


7. 消费组与分区机制

7.1. 消费组(Consumer Group)

消费组是一组逻辑相同的消费者,它们共享同一个 Group ID。Kafka 会将 Topic 分区均匀分配给组内消费者。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: employee-details
          group: group-1

✅ 同一消费组内,消费者数量不能超过分区数,否则多余的消费者不会被分配分区。

7.2. 分区键(Partition Key)

为了控制消息的顺序和分布,可以指定分区键。相同键的消息会被发送到同一个分区,保证顺序处理。

定义 EmployeeKey Schema:

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "departmentName", "type": "string"}
    ]
}

生产者设置分区键:

Message<Employee> message = MessageBuilder.withPayload(employee)
    .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
    .build();

7.3. 消费并发(Concurrency)

控制消费者并发数量:

spring:
  cloud:
    stream:
      bindings:
        input:
          concurrency: 3

✅ Spring 会启动 3 个线程并发消费消息,提升处理效率。


8. 总结

本文介绍了如何使用 Spring Cloud Stream + Kafka + Avro + Confluent Schema Registry 构建一个完整的消息系统:

  • ✅ 使用 Avro 定义 Schema 并自动生成 POJO
  • ✅ Spring Cloud Stream 快速集成 Kafka 生产与消费
  • ✅ 配置 Confluent Schema Registry 实现 Schema 管理
  • ✅ 使用 Confluent 原生序列化器提升兼容性
  • ✅ 合理配置消费组、分区键与并发,优化消息处理效率

该方案适用于微服务架构中跨服务的数据通信场景,尤其适合需要 Schema 演化与兼容性控制的业务场景。


原始标题:Guide to Spring Cloud Stream with Kafka, Apache Avro and Confluent Schema Registry | Baeldung