1. 概述
Apache Kafka 已成为最流行的消息传递和事件流平台之一。但搭建和维护 Kafka 集群是个复杂过程,通常需要大型组织中的专业团队来保障高可用性、可靠性、负载均衡和扩展能力。
AutoMQ 是一款云原生的 Kafka 替代方案,专注于降低成本和提升效率。它采用共享存储架构,将数据存储在 Amazon S3 中,并通过 Amazon EBS 保证数据持久性。
本教程将探讨如何在 Spring Boot 应用中集成 AutoMQ,包括搭建本地 AutoMQ 集群,并实现基础的生产者-消费者模式。
2. 使用 Testcontainers 搭建 AutoMQ
为简化本地开发和测试,我们将使用 Testcontainers 搭建 AutoMQ 集群。前提条件是已安装 Docker 和 Docker Compose。
AutoMQ 提供了本地部署的 Docker Compose 文件,使用 LocalStack 模拟 Amazon S3 服务,用本地文件系统模拟 Amazon EBS。我们将直接使用这个 Compose 文件。
⚠️ 注意:以下配置仅适用于开发测试环境,切勿用于生产环境。
2.1. 依赖项
首先在项目的 pom.xml 中添加必要依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.0</version>
</dependency>
AutoMQ 完全兼容 Apache Kafka,意味着它实现了相同的 API、协议和配置属性。因此我们可以使用熟悉的 spring-kafka 依赖 集成 AutoMQ。
接着添加测试依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
spring-boot-testcontainers 依赖 提供了启动 AutoMQ 集群所需的 Docker 容器管理类。
此外,我们添加了 awaitility 库,稍后用于测试异步的生产者-消费者实现。
2.2. 定义 Testcontainers Bean
接下来创建一个 @TestConfiguration 类定义容器 Bean:
@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {
private static final String COMPOSE_URL = "https://download.automq.com/community_edition/standalone_deployment/docker-compose.yaml";
@Bean
public ComposeContainer composeContainer() {
File dockerCompose = downloadComposeFile();
return new ComposeContainer(dockerCompose)
.withLocalCompose(true);
}
private File downloadComposeFile() {
File dockerCompose = Files.createTempFile("docker-compose", ".yaml").toFile();
FileUtils.copyURLToFile(URI.create(COMPOSE_URL).toURL(), dockerCompose);
return dockerCompose;
}
}
这里使用了 Testcontainers 的 Docker Compose 模块。首先下载 AutoMQ 的 Docker Compose 文件,并基于它创建 ComposeContainer Bean。
通过 withLocalCompose(true)
指示 Testcontainers 使用本地安装的 Docker Compose 二进制文件。
但有个踩坑点:Testcontainers 目前不支持 Docker Compose 的 container_name 属性。我们需要临时绕过这个问题:
private File downloadComposeFile() {
// ... 同上
return removeContainerNames(dockerCompose);
}
private File removeContainerNames(File composeFile) {
List<String> filteredLines = Files.readAllLines(composeFile.toPath())
.stream()
.filter(line -> !line.contains("container_name:"))
.toList();
Files.write(composeFile.toPath(), filteredLines);
return composeFile;
}
removeContainerNames()
方法会移除下载的 Compose 文件中的 container_name
属性。这个变通方案确保创建的容器不包含该属性。
最后配置 bootstrap-servers
属性让应用连接到 AutoMQ 集群:
@Bean
public DynamicPropertyRegistrar dynamicPropertyRegistrar() {
return registry -> {
registry.add("spring.kafka.bootstrap-servers", () -> "localhost:9094,localhost:9095");
};
}
**在定义 DynamicPropertyRegistrar Bean 时,我们配置了默认的 AutoMQ 服务器地址 localhost:9094,localhost:9095
**。
配置好连接信息后,Spring Boot 会自动创建 KafkaTemplate Bean,供后续使用。
2.3. 在开发中使用 Testcontainers
Testcontainers 虽主要用于集成测试,但也可用于本地开发。
在 src/test/java 目录下创建独立的启动类:
public class TestApplication {
public static void main(String[] args) {
SpringApplication.from(Application::main)
.with(TestcontainersConfiguration.class)
.run(args);
}
}
创建 TestApplication 类,在其 main()
方法中启动主应用类并附加 TestcontainersConfiguration。
这种方案能帮我们在本地轻松管理外部服务。运行 Spring Boot 应用时,它会自动连接到通过 Testcontainers 启动的服务。
3. 实现生产者-消费者模式
现在本地 AutoMQ 集群已就绪,我们来实现基础的生产者-消费者模式。
3.1. 配置 AutoMQ 消费者
首先在 application.yml 中定义消费者监听的主题名称:
com:
baeldung:
topic:
onboarding-initiated: user-service.onboarding.initiated.v1
然后创建消费者类监听该主题:
@Configuration
class UserOnboardingInitiatedListener {
private static final Logger log = LoggerFactory.getLogger(UserOnboardingInitiatedListener.class);
@KafkaListener(topics = "${com.baeldung.topic.onboarding-initiated}", groupId = "user-service")
public void listen(User user) {
log.info("Dispatching user account confirmation email to {}", user.email());
}
}
record User(String email) {
}
我们在 listen()
方法上使用 @KafkaListener 注解指定主题和消费者组。当消息发布到 user-service.onboarding.initiated.v1
主题时,该方法会被触发。
使用 User 记录 表示消息负载。
最后在 application.yml 添加以下配置:
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.value.default.type: com.baeldung.automq.User
allow.auto.create.topics: true
✅ 配置了生产者和消费者的键值序列化/反序列化器
✅ 指定 User 记录作为默认消息类型
✅ 启用主题自动创建(不存在时 AutoMQ 会自动创建)
3.2. 测试消息消费
配置好消费者后,验证它是否能正确消费并记录消息:
@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
@Import(TestcontainersConfiguration.class)
class UserOnboardingInitiatedListenerLiveTest {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
@Value("${com.baeldung.topic.onboarding-initiated}")
private String onboardingInitiatedTopic;
@Test
void whenMessagePublishedToTopic_thenProcessedByListener(CapturedOutput capturedOutput) {
User user = new User("[email protected]");
kafkaTemplate.send(onboardingInitiatedTopic, user);
String expectedConsumerLog = String.format("Dispatching user account confirmation email to %s", user.email());
Awaitility
.await()
.atMost(1, TimeUnit.SECONDS)
.until(() -> capturedOutput.getAll().contains(expectedConsumerLog));
}
}
通过 @Autowired
注入 KafkaTemplate 实例,并用 @Value
注入主题名称。
测试逻辑:
- 创建 User 对象并通过 KafkaTemplate 发送到主题
- 使用 awaitility 和 OutputCaptureExtension 提供的 CapturedOutput 验证消费者是否输出预期日志
⚠️ 测试可能间歇性失败:消费者启动和订阅主题需要时间。解决方案是在测试前等待分区分配完成:
@BeforeAll
void setUp(CapturedOutput capturedOutput) {
String expectedLog = "partitions assigned";
Awaitility
.await()
.atMost(Durations.ONE_MINUTE)
.pollDelay(Durations.ONE_SECOND)
.until(() -> capturedOutput.getAll().contains(expectedLog));
}
在 @BeforeAll
标注的 setUp()
方法中:
- 最多等待 1 分钟
- 每秒轮询一次
- 直到日志中出现 "partitions assigned" 确认分区分配完成
这个测试类也展示了 awaitility 在测试异步操作时的强大能力。
4. 总结
本文介绍了在 Spring Boot 应用中集成 AutoMQ 的完整流程:
✅ 使用 Testcontainers 的 Docker Compose 模块启动 AutoMQ 集群
✅ 搭建本地测试环境
✅ 实现基础的生产者-消费者架构
✅ 完成端到端测试验证
通过这套方案,开发者可以快速体验 AutoMQ 的核心特性,同时保持与 Kafka 生态的完全兼容性。对于需要降低云成本和运维复杂度的团队来说,这是个值得考虑的替代方案。