1. 概述
Apache Pulsar 是一个分布式的发布-订阅消息系统。虽然它提供的功能与 Apache Kafka 类似,但 Pulsar 旨在解决 Kafka 在高延迟、低吞吐量、扩展困难和地理复制等方面的局限性。当需要处理大量实时数据时,Apache Pulsar 是一个很好的替代方案。
本教程将介绍如何将 Apache Pulsar 集成到 Spring Boot 应用中。我们将利用 Pulsar 的 Spring Boot Starter 自动配置的 PulsarTemplate
和 PulsarListener
,并展示如何根据需求修改它们的默认配置。
2. Maven 依赖
首先,按照 Apache Pulsar 入门 的说明启动一个独立的 Pulsar 服务器。
然后,将 spring-pulsar-spring-boot-starter
库添加到项目中:
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
3. PulsarClient 配置
要与 Pulsar 服务器交互,需要配置 PulsarClient
。默认情况下,Spring 会自动配置一个连接到 localhost:6650
的 PulsarClient:
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
可以修改此配置以连接到其他地址。
**要连接到安全服务器,可以使用 pulsar+ssl
替代 pulsar
**。还可以通过在 application.yml
中添加 spring.pulsar.client.*
属性来配置连接超时、认证和内存限制等参数。
4. 自定义对象 Schema 定义
我们使用一个简单的 User
类作为示例:
public class User {
private String email;
private String firstName;
// 标准构造方法、getter 和 setter
}
Spring-Pulsar 能自动检测基本数据类型并生成相应 Schema。但如果需要处理自定义 JSON 对象,必须为 PulsarClient
配置其 Schema 信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON
这里,message-type
属性接受消息类的全限定名,schema-type
指定要使用的 Schema 类型。对于复杂对象,schema-type
支持 AVRO
或 JSON
值。
虽然使用属性文件指定 Schema 是首选方法,但也可以通过 Bean 提供:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
}
}
此配置需要同时添加到生产者和监听器应用中。
5. 消息生产者
使用 PulsarTemplate
向 Pulsar 主题发布消息。PulsarTemplate
实现了 PulsarOperations
接口,提供同步和异步发布记录的方法。**send
方法阻塞调用以提供同步操作能力,而 sendAsync
方法提供非阻塞异步操作**。
本教程中,我们将使用同步操作发布记录。
5.1. 发布消息
Spring Boot 自动配置了一个开箱即用的 PulsarTemplate
,用于向指定主题发布记录。
创建一个向队列发布 String
消息的生产者:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}
现在尝试发送 User
对象到新队列:
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
上述代码中,我们使用 PulsarTemplate
将 User
类对象发送到名为 user-topic
的 Pulsar 主题。
5.2. 生产者端自定义配置
PulsarTemplate
接受 TypedMessageBuilderCustomizer
配置出站消息,以及 ProducerBuilderCustomizer
自定义生产者属性。
使用 TypedMessageBuilderCustomizer
配置消息延迟、定时发送、禁用复制和添加额外属性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}
ProducerBuilderCustomizer
可用于添加访问模式、自定义消息路由器、拦截器,以及启用/禁用分块和批处理:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}
6. 消息消费者
向主题发布消息后,现在为同一主题建立监听器。要启用主题监听,需要使用 @PulsarListener
注解装饰监听方法。
Spring Boot 会为监听方法自动配置所有必要组件。
**还需要使用 @EnablePulsar
注解来启用 PulsarListener
**。
6.1. 接收消息
首先为前面创建的 string-topic
创建监听方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}
在 PulsarListener
注解中,通过 topics
配置监听的主题,通过 subscriptionName
指定订阅名称。
现在为 User
类使用的 user-topic
创建监听方法:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
除了前面监听方法的属性外,还添加了 schemaType
属性,其值与生产者中的配置相同。
在主类上添加 @EnablePulsar
注解:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}
6.2. 消费者端自定义配置
除了订阅名称和 Schema 类型,PulsarListener
还可用于配置自动启动、批处理和确认模式等属性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
这里将确认模式设置为 Record
,确认超时设为 60 秒。
7. 使用死信队列
如果消息确认超时或服务器收到 nack
,Pulsar 会尝试重新传递消息一定次数。重试次数用尽后,这些未传递的消息将被发送到称为死信队列(DLQ)的队列。
此选项仅适用于 Shared
订阅类型。要为 user-topic
队列配置 DLQ,首先创建一个 DeadLetterPolicy
Bean,定义重试次数和用作 DLQ 的队列名称:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
现在将此策略添加到之前创建的 PulsarListener
:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
这里配置 userTopicListener
使用之前创建的 deadLetterPolicy
,并设置了 60 秒的确认时间。
可以创建单独的监听器处理 DLQ 中的消息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
8. 总结
本教程介绍了如何在 Spring Boot 应用中使用 Apache Pulsar,以及修改默认配置的几种方法。
完整的示例实现可在 GitHub 上找到。