1. 简介
Apache Kafka 是一个分布式消息平台,可以实现多个系统之间的高吞吐数据交换。
Spring Cloud Stream 是一个用于构建消息驱动微服务的框架。✅ 它大大简化了 Kafka 的集成工作,让开发者更专注于业务逻辑而非底层通信细节。
在实际开发中,Kafka 通常结合 Avro 消息格式和 Schema Registry 使用。本文将以 Confluent Schema Registry 为例,演示如何使用 Spring Cloud Stream 与 Kafka 集成,同时支持 Avro 格式的消息序列化与反序列化。我们还会对比 Spring 自带的实现与 Confluent 原生库的差异。
2. Confluent Schema Registry
Kafka 本质上只处理字节流(byte stream),所以通常需要一个外部的 Schema 来定义消息格式,并通过序列化/反序列化实现对象与字节之间的转换。
为了避免每次消息都携带完整的 Schema,Schema Registry 的作用就是集中管理 Schema,并通过 Schema ID 来标识每个 Schema。生产者在发送消息前先注册 Schema,消费者根据消息头中的 Schema ID 获取对应的 Schema 进行反序列化。
Confluent Schema Registry 提供了一套 REST API 来管理 Schema,支持 Schema 版本控制和兼容性检查。
- Schema 存储方式:按“主题(Subject)”分类存储
- 兼容性检查:上传新 Schema 时,默认会检查与旧版本是否兼容(如向后兼容)
- Schema ID 的作用:用于标识消息所使用的 Schema,避免重复传输
生产者发送消息时,会从 Schema Registry 获取 Schema ID;消费者消费消息时,也会根据 Schema ID 获取对应的 Schema,从而实现结构化数据的解析。
3. Apache Avro 简介
Apache Avro 是一个高效的数据序列化系统。它使用 JSON 格式定义 Schema,支持多种语言生成数据模型类(如 Java POJO),便于序列化和反序列化。
Avro 的优势在于:
✅ 支持 Schema 的演化(Schema Evolution)
✅ 支持跨版本兼容(如新增字段不影响旧消费者)
✅ 二进制编码效率高,比 JSON 更紧凑
通过 Avro 提供的工具,我们可以根据 Schema 文件(如 .avsc
)自动生成 Java 类,从而简化开发流程。
4. 项目依赖配置
使用 Spring Cloud Stream + Kafka + Avro + Schema Registry,需要引入以下依赖:
Maven 依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>4.0.0</version>
</dependency>
仓库配置
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Avro 代码生成插件
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
启动本地 Kafka 与 Schema Registry
建议使用 Docker 快速搭建测试环境,例如:
docker-compose up -d
5. Spring Cloud Stream 实现消息生产与消费
5.1. 定义 Avro Schema
在 src/main/resources
下创建 employee-schema.avsc
文件:
{
"type": "record",
"name": "Employee",
"namespace": "com.baeldung.schema",
"fields": [
{"name": "id", "type": "int"},
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"}
]
}
构建项目后,Maven 插件会自动生成 com.baeldung.schema.Employee
类。
5.2. 消息生产者
使用 Spring Cloud Stream 的 Processor
接口发送消息:
@Autowired
private Processor processor;
public void produceEmployeeDetails(int empId, String firstName, String lastName) {
Employee employee = new Employee(empId, firstName, lastName);
Message<Employee> message = MessageBuilder.withPayload(employee).build();
processor.output().send(message);
}
5.3. 消息消费者
监听 Kafka 消息并打印日志:
@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
logger.info("处理员工信息: {}", employeeDetails);
}
5.4. Kafka 绑定配置
application.yml
中配置 Kafka 绑定:
spring:
cloud:
stream:
bindings:
input:
destination: employee-details
content-type: application/*+avro
output:
destination: employee-details
content-type: application/*+avro
⚠️ 注意:这里的 destination
表示 Kafka Topic 名称。
5.5. 启动类配置
启用 Schema Registry 支持:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(AvroKafkaApplication.class, args);
}
}
5.6. Schema Registry Client Bean
@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;
@Bean
public SchemaRegistryClient schemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}
5.7. 测试接口
添加一个 REST 接口触发消息发送:
@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, @PathVariable String lastName) {
avroProducer.produceEmployeeDetails(id, firstName, lastName);
return "消息已发送至消费者";
}
5.8. 消息处理流程
- 生产者构建
Employee
对象 - 注册 Schema 到 Schema Registry,获取 Schema ID
- 使用 Avro 序列化对象为字节流
- Schema ID 放入消息 Header
- 发送消息到 Kafka
- 消费者从 Header 中读取 Schema ID
- 通过 Schema Registry 获取 Schema
- 反序列化字节流为本地对象
6. 使用 Confluent 原生序列化/反序列化
Spring Boot 默认使用 AvroSchemaMessageConverter
,但在某些场景下(如跨语言、分区、兼容性)推荐使用 Confluent 原生的 KafkaAvroSerializer
和 KafkaAvroDeserializer
。
更新 application.yml
配置:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
✅ useNativeEncoding
会启用原生序列化,提升性能并增强与 Confluent 生态的互操作性。
7. 消费组与分区机制
7.1. 消费组(Consumer Group)
消费组是一组逻辑相同的消费者,它们共享同一个 Group ID。Kafka 会将 Topic 分区均匀分配给组内消费者。
spring:
cloud:
stream:
bindings:
input:
destination: employee-details
group: group-1
✅ 同一消费组内,消费者数量不能超过分区数,否则多余的消费者不会被分配分区。
7.2. 分区键(Partition Key)
为了控制消息的顺序和分布,可以指定分区键。相同键的消息会被发送到同一个分区,保证顺序处理。
定义 EmployeeKey
Schema:
{
"type": "record",
"name": "EmployeeKey",
"namespace": "com.baeldung.schema",
"fields": [
{"name": "id", "type": "int"},
{"name": "departmentName", "type": "string"}
]
}
生产者设置分区键:
Message<Employee> message = MessageBuilder.withPayload(employee)
.setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
.build();
7.3. 消费并发(Concurrency)
控制消费者并发数量:
spring:
cloud:
stream:
bindings:
input:
concurrency: 3
✅ Spring 会启动 3 个线程并发消费消息,提升处理效率。
8. 总结
本文介绍了如何使用 Spring Cloud Stream + Kafka + Avro + Confluent Schema Registry 构建一个完整的消息系统:
- ✅ 使用 Avro 定义 Schema 并自动生成 POJO
- ✅ Spring Cloud Stream 快速集成 Kafka 生产与消费
- ✅ 配置 Confluent Schema Registry 实现 Schema 管理
- ✅ 使用 Confluent 原生序列化器提升兼容性
- ✅ 合理配置消费组、分区键与并发,优化消息处理效率
该方案适用于微服务架构中跨服务的数据通信场景,尤其适合需要 Schema 演化与兼容性控制的业务场景。