1. 概述

Apache Pulsar 是一个分布式的发布-订阅消息系统。虽然它提供的功能与 Apache Kafka 类似,但 Pulsar 旨在解决 Kafka 在高延迟、低吞吐量、扩展困难和地理复制等方面的局限性。当需要处理大量实时数据时,Apache Pulsar 是一个很好的替代方案。

本教程将介绍如何将 Apache Pulsar 集成到 Spring Boot 应用中。我们将利用 Pulsar 的 Spring Boot Starter 自动配置的 PulsarTemplatePulsarListener,并展示如何根据需求修改它们的默认配置。

2. Maven 依赖

首先,按照 Apache Pulsar 入门 的说明启动一个独立的 Pulsar 服务器。

然后,将 spring-pulsar-spring-boot-starter 库添加到项目中:

<dependency>
    <groupId>org.springframework.pulsar</groupId>
    <artifactId>spring-pulsar-spring-boot-starter</artifactId>
    <version>0.2.0</version>
</dependency>

3. PulsarClient 配置

要与 Pulsar 服务器交互,需要配置 PulsarClient。默认情况下,Spring 会自动配置一个连接到 localhost:6650 的 PulsarClient

spring:
  pulsar:
    client:
      service-url: pulsar://localhost:6650

可以修改此配置以连接到其他地址。

**要连接到安全服务器,可以使用 pulsar+ssl 替代 pulsar**。还可以通过在 application.yml 中添加 spring.pulsar.client.* 属性来配置连接超时、认证和内存限制等参数。

4. 自定义对象 Schema 定义

我们使用一个简单的 User 类作为示例:

public class User {

    private String email;
    private String firstName;

    // 标准构造方法、getter 和 setter
}

Spring-Pulsar 能自动检测基本数据类型并生成相应 Schema。但如果需要处理自定义 JSON 对象,必须为 PulsarClient 配置其 Schema 信息

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.baeldung.springpulsar.User
          schema-info:
            schema-type: JSON

这里,message-type 属性接受消息类的全限定名,schema-type 指定要使用的 Schema 类型。对于复杂对象,schema-type 支持 AVROJSON 值。

虽然使用属性文件指定 Schema 是首选方法,但也可以通过 Bean 提供:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
    return (schemaResolver) -> {
        schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
    }
}

此配置需要同时添加到生产者和监听器应用中。

5. 消息生产者

使用 PulsarTemplate 向 Pulsar 主题发布消息。PulsarTemplate 实现了 PulsarOperations 接口,提供同步和异步发布记录的方法。**send 方法阻塞调用以提供同步操作能力,而 sendAsync 方法提供非阻塞异步操作**。

本教程中,我们将使用同步操作发布记录。

5.1. 发布消息

Spring Boot 自动配置了一个开箱即用的 PulsarTemplate,用于向指定主题发布记录。

创建一个向队列发布 String 消息的生产者:

@Component
public class PulsarProducer {

    @Autowired
    private PulsarTemplate<String> stringTemplate;

    private static final String STRING_TOPIC = "string-topic";

    public void sendStringMessageToPulsarTopic(String str) throws PulsClientException {
        stringTemplate.send(STRING_TOPIC, str);
    }
}

现在尝试发送 User 对象到新队列:

@Autowired
private PulsarTemplate<User> template;

private static final String USER_TOPIC = "user-topic";

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.send(USER_TOPIC, user);
}

上述代码中,我们使用 PulsarTemplateUser 类对象发送到名为 user-topic 的 Pulsar 主题。

5.2. 生产者端自定义配置

PulsarTemplate 接受 TypedMessageBuilderCustomizer 配置出站消息,以及 ProducerBuilderCustomizer 自定义生产者属性。

使用 TypedMessageBuilderCustomizer 配置消息延迟、定时发送、禁用复制和添加额外属性:

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withMessageCustomizer(mc -> {
        mc.deliverAfter(10L, TimeUnit.SECONDS);
      })
      .send();
}

ProducerBuilderCustomizer 可用于添加访问模式、自定义消息路由器、拦截器,以及启用/禁用分块和批处理:

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withProducerCustomizer(pc -> {
        pc.accessMode(ProducerAccessMode.Shared);
      })
      .send();
}

6. 消息消费者

向主题发布消息后,现在为同一主题建立监听器。要启用主题监听,需要使用 @PulsarListener 注解装饰监听方法

Spring Boot 会为监听方法自动配置所有必要组件。

**还需要使用 @EnablePulsar 注解来启用 PulsarListener**。

6.1. 接收消息

首先为前面创建的 string-topic 创建监听方法:

@Service
public class PulsarConsumer {

    private static final String STRING_TOPIC = "string-topic";

    @PulsarListener(
      subscriptionName = "string-topic-subscription",
      topics = STRING_TOPIC,
      subscriptionType = SubscriptionType.Shared
    )
    public void stringTopicListener(String str) {
        LOGGER.info("Received String message: {}", str);
    }
}

PulsarListener 注解中,通过 topics 配置监听的主题,通过 subscriptionName 指定订阅名称。

现在为 User 类使用的 user-topic 创建监听方法:

private static final String USER_TOPIC = "user-topic";

@PulsarListener(
    subscriptionName = "user-topic-subscription",
    topics = USER_TOPIC,
    schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

除了前面监听方法的属性外,还添加了 schemaType 属性,其值与生产者中的配置相同。

在主类上添加 @EnablePulsar 注解:

@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringPulsarApplication.class, args);
    }
}

6.2. 消费者端自定义配置

除了订阅名称和 Schema 类型,PulsarListener 还可用于配置自动启动、批处理和确认模式等属性:

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  ackMode = AckMode.RECORD,
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

这里将确认模式设置为 Record,确认超时设为 60 秒。

7. 使用死信队列

如果消息确认超时或服务器收到 nack,Pulsar 会尝试重新传递消息一定次数。重试次数用尽后,这些未传递的消息将被发送到称为死信队列(DLQ)的队列

此选项仅适用于 Shared 订阅类型。要为 user-topic 队列配置 DLQ,首先创建一个 DeadLetterPolicy Bean,定义重试次数和用作 DLQ 的队列名称:

private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
    return DeadLetterPolicy.builder()
      .maxRedeliverCount(10)
      .deadLetterTopic(USER_DEAD_LETTER_TOPIC)
      .build();
}

现在将此策略添加到之前创建的 PulsarListener

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  deadLetterPolicy = "deadLetterPolicy",
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

这里配置 userTopicListener 使用之前创建的 deadLetterPolicy,并设置了 60 秒的确认时间。

可以创建单独的监听器处理 DLQ 中的消息:

@PulsarListener(
  subscriptionName = "dead-letter-topic-subscription",
  topics = USER_DEAD_LETTER_TOPIC,
  subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
    LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}

8. 总结

本教程介绍了如何在 Spring Boot 应用中使用 Apache Pulsar,以及修改默认配置的几种方法。

完整的示例实现可在 GitHub 上找到。


原始标题:Getting Started With Apache Pulsar and Spring Boot