1. 概述

本文将深入探讨如何利用 MicrometerSpring Boot Actuator 实现 Spring Kafka 的监控。我们将从 Apache Kafka 原生指标开始,这些指标为生产者和消费者提供了性能、吞吐量、错误率和延迟等关键洞察。

接着我们将深入 Spring 特有的指标体系,重点关注 spring.kafka.listenerspring.kafka.template 下的指标。还会学习如何通过 Spring 配置自定义 @KafkaListenerKafkaTemplate,为这些指标添加自定义标签。

最后我们将讨论追踪功能,看看 Spring Kafka 如何轻松传播 Micrometer 生成的追踪信息,帮助我们更好地调试和监控系统中的消息流转。

2. 环境搭建

本文代码示例基于一个博客学习网站(类似 Baeldung)的后端应用。我们将使用一个简单的 Spring Boot 应用,依赖如下:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

src/test/resources/docker 目录下,可以找到用于本地启动的 docker-compose.yml 文件。

应用配置暴露了所有指标接口:

management.endpoints.web.exposure.include: '*'

启动应用后,访问 http://localhost:8081/actuator 即可查看所有暴露的接口和指标:

默认 Actuator 接口展示

功能上,服务提供了创建文章评论的 HTTP 接口。提交评论后,应用会向 Kafka 主题 baeldung.article-comment.added 发送消息,方便我们追踪 Kafka 生产者指标。

稍后我们会设置 Kafka 监听器消费同一主题的消息,借此理解如何监控监听器及其暴露的指标。

3. Kafka 原生指标

仅添加 spring-boot-starter-actuator 依赖就能开箱即用地暴露 Kafka 指标(Spring Boot 2.5+ 版本支持)。下面重点看 Kafka 生产者和消费者的原生指标。

3.1. 生产者指标

先让应用产生一些事件,通过 REST API 创建几条评论:

curl --location 'http://localhost:8081/api/articles/oop-best-practices/comments' \
--header 'Content-Type: application/json' \
--data '{
    "articleAuthor": "Andrey the Author",
    "comment": "Great article!",
    "commentAuthor": "Richard the Reader"
}'

然后访问 http://localhost:8081/actuator/metrics 接口。这次能看到多个 Kafka 生产者指标,包括延迟和失败率等数据:

  • kafka.producer.record.error.rate – 记录发送失败频率
  • kafka.producer.request.latency.avg – 生产请求平均完成时间
  • kafka.producer.buffer.exhausted.rate – 生产者缓冲区耗尽频率
  • kafka.producer.record.send.rate – 记录发送到代理的速率
  • kafka.producer.requests.in.flight – 当前未确认的生产请求数

完整指标列表参考 Apache Kafka 官方文档通过在路径后追加指标名即可查看具体指标,例如访问生产者错误率接口:

生产者错误指标展示

3.2. 消费者指标

和生产者指标类似,Micrometer 也会记录 Kafka 消费者相关指标并通过 Actuator 暴露。

为演示这点,添加一个 @KafkaListener 监听同一主题:

@Component
public class ArticleCommentsListener {

    @KafkaListener(topics = "baeldung.article-comment.added")
    public void onArticleComment(ArticleCommentAddedEvent event) {
        // 业务逻辑...
    }

}

重启应用并发送请求后查看指标。这次能看到多个 Kafka 消费者指标,主要包括:

  • kafka.consumer.fetch.manager.records.lag – 消费者滞后程度
  • kafka.consumer.fetch.manager.fetch.latency.avg – 从代理获取数据的平均时间
  • kafka.consumer.coordinator.rebalance.rate.per.hour – 消费者组重平衡频率
  • kafka.consumer.last.poll.seconds.ago – 距离上次轮询的时间
  • kafka.consumer.time.between.poll.avg – 连续轮询的平均间隔

完整指标列表参考 Kafka 官方文档

3.3. 添加自定义标签

Spring Kafka 内置 API 可为原生指标添加自定义标签。例如通过监听 ProducerFactory bean 自定义生产者指标:

@Bean
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory(
    KafkaProperties kafkaProperties, MeterRegistry meterRegistry
) {
    ProducerFactory pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    pf.addListener(
        new MicrometerProducerListener<String, String>(
            meterRegistry,
            Collections.singletonList(new ImmutableTag("app-name", "article-comments-app"))
        )
    );
    return pf;
}

这为所有生产者指标添加了自定义名称。类似地,修改 ConsumerFactory 添加 MicrometerConsumerListener 可为消费者指标添加自定义标签。

4. 监控 KafkaTemplate

示例应用使用 KafkaTemplate bean 发布消息。除生产者指标外,Micrometer 还记录 KafkaTemplate 特有指标,名称为 spring.kafka.template

可自定义 KafkaTemplate bean 添加或更新标签。setMicrometerTags() 方法支持定义键值对标签:

@Bean
@Qualifier("articleCommentsKafkaTemplate")
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
    ProducerFactory<String, ArticleCommentAddedEvent> producerFactory
) {
    var template = new KafkaTemplate<>(producerFactory);
    template.setMicrometerTags(Map.of(
        "topic", "baeldung.article-comment.added"
    ));

    return template;
}

此外 setMicrometerTagsProvider() 可动态生成标签。 用它提取记录键作为标签:

template.setMicrometerTagsProvider(
    record -> Map.of("article-slug", record.key().toString())
);

向不同文章添加评论后查看 http://localhost:8081/actuator/metrics/spring.kafka.template 接口:

KafkaTemplate 指标展示

数据包含所有 KafkaTemplate 性能记录及我们的自定义标签。

5. 监控 KafkaListener

与 KafkaTemplate 类似,Micrometer 也监控 KafkaListener 并以 spring.kafka.listener 名称暴露指标。 Spring Kafka 提供一致的 API,方便配置监听器指标的自定义标签。

setMicrometerTags()setMicrometerTagsProvider() 方法可在 ConcurrentKafkaListenerContainerFactory 级别配置:

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
    ConsumerFactory<String, String> consumerFactory
) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);

    ContainerProperties containerProps = factory.getContainerProperties();
    containerProps.setMicrometerTags(Map.of(
        "app-name", "article-comments-app"
    ));
    containerProps.setMicrometerTagsProvider(
        record -> Map.of("article-slug", record.key().toString())
    );
    return factory;
}

同时更新 @KafkaListener 注解指向自定义的 containerFactory

@KafkaListener(
    topics = "baeldung.article-comment.added",
    containerFactory = "customKafkaListenerContainerFactory"
)
public void onArticleComment(ArticleCommentAddedEvent event) {
    // ...
}

这样 Micrometer 会将静态和动态标签附加到指标,通过 http://localhost:8081/actuator/metrics/spring.kafka.listener 暴露。

6. Kafka 消息追踪

Micrometer 的追踪功能通过向日志添加追踪信息帮助跟踪请求流,极大简化调试和监控。我们将使用 Spring Kafka 内置功能通过消息元数据轻松传播追踪上下文。

6.1. 丰富日志内容

Micrometer 使用 映射诊断上下文 (MDC) 存储两个 ID:traceIdspanId

在 REST 控制器中打断点,暂停时计算 MDC.getCopyOfContextMap() 可观察到:

请求时 MDC 中的 spanId 和 traceId 值

MDC 中的这两个字段便于将追踪信息添加到日志。在 logback.xml 中配置:

所需 logback 配置

这样当 traceIdspanId 存在于 MDC 上下文时,会记录在线程名旁边。

6.2. 传播上下文

添加追踪的真正价值在于跨系统传播追踪信息并关联不同组件。需要使用 HTTP 请求头或 Kafka 消息头等元数据传递 traceIdspanId

先看 KafkaTemplate。需要为每条消息提取 MDC 中的 traceId 作为自定义消息头。幸运的是这已内置支持——只需调用 setObservationEnabled(true) 启用:

@Bean
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
  ProducerFactory<String, ArticleCommentAddedEvent> producerFactory) 
{
    var template = new KafkaTemplate<>(producerFactory);
    template.setObservationEnabled(true);
    // 其他配置...
    return template;
}

这样 KafkaTemplate 会将追踪信息作为键名 traceparent 的消息头添加。

在监听器端,虽然能看到新的 traceparent 头,但仍需解析并添加到 MDC。和生产者类似,在容器级别启用观察即可让 Spring Kafka 自动处理:

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
  ConsumerFactory<String, String> consumerFactory
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);

    ContainerProperties containerProps = factory.getContainerProperties();
    containerProps.setObservationEnabled(true);
    // 其他配置...
    return factory;
}

运行应用并发送请求后,可追踪从 HTTP 请求到 Kafka 消息再到监听器处理的完整流程(跨不同线程):

[http-nio-8081-exec-2] __680df9d4fcab49ea0511b54ff0f3ce9f__0511b54ff0f3ce9f INFO HTTP Request received to save article comment: ArticleCommentAddedDto[...]
[org.s.kafka...#0-0-C-1] __680df9d4fcab49ea0511b54ff0f3ce9f__de00d94a8258a1b9 INFO  Kafka Message Received: Comment Added: ArticleCommentAddedEvent[...]

这只是简单示例,但在多服务复杂系统中追踪请求时这个功能会非常实用。

7. 总结

本文探讨了 Spring Kafka 的监控功能。通过结合 Apache Kafka 原生指标与 Spring Kafka 和 Micrometer 提供的扩展监控支持,我们能全面掌握消息系统的健康状态和性能表现。

原生指标提供底层运维洞察,而 Spring 特有指标支持更具上下文和应用感知的可观测性。我们还学习了如何为这些指标添加定制标签。

最后我们学习了如何启用追踪并通过 traceIdspanId 丰富日志。通过自定义 Spring Kafka bean 使用 traceparent 消息头传播这些字段,实现了跨组件的消息流追踪。

本文代码可在 GitHub 获取。