1. 概述
本文将深入探讨如何利用 Micrometer 和 Spring Boot Actuator 实现 Spring Kafka 的监控。我们将从 Apache Kafka 原生指标开始,这些指标为生产者和消费者提供了性能、吞吐量、错误率和延迟等关键洞察。
接着我们将深入 Spring 特有的指标体系,重点关注 spring.kafka.listener
和 spring.kafka.template
下的指标。还会学习如何通过 Spring 配置自定义 @KafkaListener
和 KafkaTemplate
,为这些指标添加自定义标签。
最后我们将讨论追踪功能,看看 Spring Kafka 如何轻松传播 Micrometer 生成的追踪信息,帮助我们更好地调试和监控系统中的消息流转。
2. 环境搭建
本文代码示例基于一个博客学习网站(类似 Baeldung)的后端应用。我们将使用一个简单的 Spring Boot 应用,依赖如下:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
在 src/test/resources/docker
目录下,可以找到用于本地启动的 docker-compose.yml
文件。
应用配置暴露了所有指标接口:
management.endpoints.web.exposure.include: '*'
启动应用后,访问 http://localhost:8081/actuator 即可查看所有暴露的接口和指标:
功能上,服务提供了创建文章评论的 HTTP 接口。提交评论后,应用会向 Kafka 主题 baeldung.article-comment.added
发送消息,方便我们追踪 Kafka 生产者指标。
稍后我们会设置 Kafka 监听器消费同一主题的消息,借此理解如何监控监听器及其暴露的指标。
3. Kafka 原生指标
仅添加 spring-boot-starter-actuator
依赖就能开箱即用地暴露 Kafka 指标(Spring Boot 2.5+ 版本支持)。下面重点看 Kafka 生产者和消费者的原生指标。
3.1. 生产者指标
先让应用产生一些事件,通过 REST API 创建几条评论:
curl --location 'http://localhost:8081/api/articles/oop-best-practices/comments' \
--header 'Content-Type: application/json' \
--data '{
"articleAuthor": "Andrey the Author",
"comment": "Great article!",
"commentAuthor": "Richard the Reader"
}'
然后访问 http://localhost:8081/actuator/metrics 接口。这次能看到多个 Kafka 生产者指标,包括延迟和失败率等数据:
kafka.producer.record.error.rate
– 记录发送失败频率kafka.producer.request.latency.avg
– 生产请求平均完成时间kafka.producer.buffer.exhausted.rate
– 生产者缓冲区耗尽频率kafka.producer.record.send.rate
– 记录发送到代理的速率kafka.producer.requests.in.flight
– 当前未确认的生产请求数
完整指标列表参考 Apache Kafka 官方文档。通过在路径后追加指标名即可查看具体指标,例如访问生产者错误率接口:
3.2. 消费者指标
和生产者指标类似,Micrometer 也会记录 Kafka 消费者相关指标并通过 Actuator 暴露。
为演示这点,添加一个 @KafkaListener
监听同一主题:
@Component
public class ArticleCommentsListener {
@KafkaListener(topics = "baeldung.article-comment.added")
public void onArticleComment(ArticleCommentAddedEvent event) {
// 业务逻辑...
}
}
重启应用并发送请求后查看指标。这次能看到多个 Kafka 消费者指标,主要包括:
kafka.consumer.fetch.manager.records.lag
– 消费者滞后程度kafka.consumer.fetch.manager.fetch.latency.avg
– 从代理获取数据的平均时间kafka.consumer.coordinator.rebalance.rate.per.hour
– 消费者组重平衡频率kafka.consumer.last.poll.seconds.ago
– 距离上次轮询的时间kafka.consumer.time.between.poll.avg
– 连续轮询的平均间隔
完整指标列表参考 Kafka 官方文档。
3.3. 添加自定义标签
Spring Kafka 内置 API 可为原生指标添加自定义标签。例如通过监听 ProducerFactory
bean 自定义生产者指标:
@Bean
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory(
KafkaProperties kafkaProperties, MeterRegistry meterRegistry
) {
ProducerFactory pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
pf.addListener(
new MicrometerProducerListener<String, String>(
meterRegistry,
Collections.singletonList(new ImmutableTag("app-name", "article-comments-app"))
)
);
return pf;
}
这为所有生产者指标添加了自定义名称。类似地,修改 ConsumerFactory
添加 MicrometerConsumerListener
可为消费者指标添加自定义标签。
4. 监控 KafkaTemplate
示例应用使用 KafkaTemplate
bean 发布消息。除生产者指标外,Micrometer 还记录 KafkaTemplate
特有指标,名称为 spring.kafka.template
。
可自定义 KafkaTemplate
bean 添加或更新标签。setMicrometerTags()
方法支持定义键值对标签:
@Bean
@Qualifier("articleCommentsKafkaTemplate")
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory
) {
var template = new KafkaTemplate<>(producerFactory);
template.setMicrometerTags(Map.of(
"topic", "baeldung.article-comment.added"
));
return template;
}
此外 setMicrometerTagsProvider()
可动态生成标签。 用它提取记录键作为标签:
template.setMicrometerTagsProvider(
record -> Map.of("article-slug", record.key().toString())
);
向不同文章添加评论后查看 http://localhost:8081/actuator/metrics/spring.kafka.template 接口:
数据包含所有 KafkaTemplate 性能记录及我们的自定义标签。
5. 监控 KafkaListener
与 KafkaTemplate 类似,Micrometer 也监控 KafkaListener 并以 spring.kafka.listener
名称暴露指标。 Spring Kafka 提供一致的 API,方便配置监听器指标的自定义标签。
setMicrometerTags()
和 setMicrometerTagsProvider()
方法可在 ConcurrentKafkaListenerContainerFactory
级别配置:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setMicrometerTags(Map.of(
"app-name", "article-comments-app"
));
containerProps.setMicrometerTagsProvider(
record -> Map.of("article-slug", record.key().toString())
);
return factory;
}
同时更新 @KafkaListener
注解指向自定义的 containerFactory
:
@KafkaListener(
topics = "baeldung.article-comment.added",
containerFactory = "customKafkaListenerContainerFactory"
)
public void onArticleComment(ArticleCommentAddedEvent event) {
// ...
}
这样 Micrometer 会将静态和动态标签附加到指标,通过 http://localhost:8081/actuator/metrics/spring.kafka.listener 暴露。
6. Kafka 消息追踪
Micrometer 的追踪功能通过向日志添加追踪信息帮助跟踪请求流,极大简化调试和监控。我们将使用 Spring Kafka 内置功能通过消息元数据轻松传播追踪上下文。
6.1. 丰富日志内容
Micrometer 使用 映射诊断上下文 (MDC) 存储两个 ID:traceId
和 spanId
。
在 REST 控制器中打断点,暂停时计算 MDC.getCopyOfContextMap()
可观察到:
MDC 中的这两个字段便于将追踪信息添加到日志。在 logback.xml
中配置:
这样当 traceId
和 spanId
存在于 MDC 上下文时,会记录在线程名旁边。
6.2. 传播上下文
添加追踪的真正价值在于跨系统传播追踪信息并关联不同组件。需要使用 HTTP 请求头或 Kafka 消息头等元数据传递 traceId
和 spanId
。
先看 KafkaTemplate
。需要为每条消息提取 MDC 中的 traceId
作为自定义消息头。幸运的是这已内置支持——只需调用 setObservationEnabled(true)
启用:
@Bean
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory)
{
var template = new KafkaTemplate<>(producerFactory);
template.setObservationEnabled(true);
// 其他配置...
return template;
}
这样 KafkaTemplate
会将追踪信息作为键名 traceparent
的消息头添加。
在监听器端,虽然能看到新的 traceparent
头,但仍需解析并添加到 MDC。和生产者类似,在容器级别启用观察即可让 Spring Kafka 自动处理:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setObservationEnabled(true);
// 其他配置...
return factory;
}
运行应用并发送请求后,可追踪从 HTTP 请求到 Kafka 消息再到监听器处理的完整流程(跨不同线程):
[http-nio-8081-exec-2] __680df9d4fcab49ea0511b54ff0f3ce9f__0511b54ff0f3ce9f INFO HTTP Request received to save article comment: ArticleCommentAddedDto[...]
[org.s.kafka...#0-0-C-1] __680df9d4fcab49ea0511b54ff0f3ce9f__de00d94a8258a1b9 INFO Kafka Message Received: Comment Added: ArticleCommentAddedEvent[...]
这只是简单示例,但在多服务复杂系统中追踪请求时这个功能会非常实用。
7. 总结
本文探讨了 Spring Kafka 的监控功能。通过结合 Apache Kafka 原生指标与 Spring Kafka 和 Micrometer 提供的扩展监控支持,我们能全面掌握消息系统的健康状态和性能表现。
原生指标提供底层运维洞察,而 Spring 特有指标支持更具上下文和应用感知的可观测性。我们还学习了如何为这些指标添加定制标签。
最后我们学习了如何启用追踪并通过 traceId
和 spanId
丰富日志。通过自定义 Spring Kafka bean 使用 traceparent
消息头传播这些字段,实现了跨组件的消息流追踪。
本文代码可在 GitHub 获取。