1. 引言
本文将展示如何在基于Spring Integration的应用程序中使用PostgreSQL的NOTIFY/LISTEN
特性实现消息推送机制。这种轻量级通知方案特别适合需要实时事件触发的分布式系统场景。
2. 快速回顾
PostgreSQL提供了一种轻量级消息通知机制,允许客户端通过常规数据库连接相互发送消息。该机制使用两个非标准SQL语句:NOTIFY
(发送通知)和LISTEN
(监听通知)。
⚠️ 重要提醒:我们假设读者已了解NOTIFY/LISTEN
基础用法,本文重点在于如何将其实现为SubscribableChannel
。
3. 依赖项
只需两个核心依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.8</version>
</dependency>
最新版本可在Maven中央仓库获取。
4. 什么是SubscribableChannel?
Spring Integration的SubscribableChannel
是支持异步消息传递的MessageChannel
扩展接口,核心特性包括:
- ✅
subscribe(MessageHandler)
:注册消息处理器 - ✅
unsubscribe(MessageHandler)
:取消注册 - ❌ 与响应式编程的区别:这是推送模型(push-based),而非拉取模型(pull-based)
- ⚠️ 流量控制缺失:消费者需自行实现缓冲/丢弃策略应对高并发
Spring Integration内置的PublishSubscribeChannel
实现仅限于单JVM实例,而PostgreSQL方案可突破此限制。
5. SubscribableChannel实现
基于AbstractSubscribableChannel
实现,优势包括:
- 管理功能:暴露监控指标
- 拦截器支持:允许添加
ChannelInterceptors
5.1. 消息发送
实现doSend()
方法,通过PostgreSQL的NOTIFY
发送消息:
@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
String msg = prepareNotifyPayload(message);
try (Connection c = ds.getConnection()) {
c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'");
}
return true;
} catch (Exception ex) {
throw new MessageDeliveryException(message, "无法投递消息: " + ex.getMessage(), ex);
}
}
消息序列化方法:
protected String prepareNotifyPayload(Message<?> message) throws JsonProcessingException {
Map<String, Object> rawMap = new HashMap<>();
rawMap.putAll(message.getHeaders());
JsonNode headerData = om.valueToTree(rawMap);
JsonNode bodyData = om.valueToTree(message.getPayload());
ObjectNode msg = om.getNodeFactory().objectNode();
msg.set(HEADER_FIELD, headerData);
msg.set(BODY_FIELD, bodyData);
return om.writeValueAsString(msg);
}
⚠️ 踩坑提示:PostgreSQL默认限制通知载荷大小约8KB。大消息应改用共享存储(如S3/数据库表)+引用传递。
5.2. 消息分发
动态启停监听线程:
@Override
public boolean subscribe(MessageHandler handler) {
boolean r = super.subscribe(handler);
if (r && super.getSubscriberCount() == 1) {
startListenerThread();
}
return r;
}
@Override
public boolean unsubscribe(MessageHandler handle) {
boolean r = super.unsubscribe(handle);
if (r && super.getSubscriberCount() == 0) {
stopListenerThread();
}
return r;
}
监听线程核心逻辑:
@Override
public void run() {
startLatch.countDown();
try (Statement st = conn.createStatement()) {
st.execute("LISTEN " + channelName);
PGConnection pgConn = conn.unwrap(PGConnection.class);
while (!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgConn.getNotifications();
for (PGNotification n : nts) {
Message<?> msg = convertNotification(n);
getDispatcher().dispatch(msg); // 处理拦截器/指标等
}
}
} catch (SQLException sex) {
// 异常处理省略
} finally {
stopLatch.countDown();
}
}
6. 集成示例
实现订单处理场景:发送BUY/SELL
订单,接收端维护交易余额。
6.1. 配置通道
@Bean
static SubscribableChannel orders(@Value("${db.url}") String url,
@Value("${db.username}") String username,
@Value("${db.password}") String password) {
SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true);
Supplier<Connection> connectionSupplier = () -> {
try {
return ds.getConnection();
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
};
PGSimpleDataSource pgds = new PGSimpleDataSource();
pgds.setUrl(url);
pgds.setUser(username);
pgds.setPassword(password);
return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper());
}
关键点:
- 使用两个
DataSource
:SingleConnectionDataSource
用于监听,原生PGSimpleDataSource
用于发送 - 监听连接需保持长连接
6.2. 订单处理器
@ServiceActivator(inputChannel = "orderProcessor")
void processOrder(Order order){
BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice());
if (order.getOrderType() == OrderType.SELL) {
orderTotal = orderTotal.negate();
}
BigDecimal sum = orderSummary.get(order.getSymbol());
if (sum == null) {
sum = orderTotal;
} else {
sum = sum.add(orderTotal);
}
orderSummary.put(order.getSymbol(), sum);
orderSemaphore.release(); // 测试同步用
}
6.3. 消息转换器
@Transformer(inputChannel = "orders", outputChannel = "orderProcessor" )
Order validatedOrders(Message<?> orderMessage) throws JsonProcessingException {
ObjectNode on = (ObjectNode)orderMessage.getPayload();
Order order = om.treeToValue(on, Order.class);
return order;
}
注意:未显式定义orderProcessor
时,Spring Integration会自动创建DirectChannel
。高并发场景建议改用QueueChannel
缓冲。
7. 测试
集成测试验证端到端流程:
@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class})
public class PostgresqlPubSubExampleLiveTest {
@Autowired
PostgresqlPubSubExample processor;
@Autowired
OrdersGateway ordersGateway;
@Test
void whenPublishOrder_thenSuccess() throws Exception {
Order o = new Order(1L,"BAEL", OrderType.BUY,
BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0));
ordersGateway.publish(o);
assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue();
BigDecimal total = processor.getTotalBySymbol("BAEL");
assertThat(total).isEqualTo(BigDecimal.valueOf(10));
}
}
测试要求:
- 需运行中的PostgreSQL实例
- 有效数据库凭证
8. Spring Integration 6用户注意
从v6开始,Spring Integration内置了PostgresSubscribableChannel
实现,但要求:
- ✅ Java 17+ 基线
- ✅ 突破8KB载荷限制(需创建数据库表存储大消息)
- ❌ 不兼容Java 8/11
对于存量Java 8/11项目,本文方案仍具实用价值。
9. 总结
本文展示了如何利用PostgreSQL的NOTIFY/LISTEN
机制在Spring Integration中实现异步消息传递。核心优势包括:
- 跨JVM的消息分发能力
- 基于数据库的轻量级实现
- 与Spring Integration生态无缝集成
完整代码见GitHub仓库。