1. 引言

本文将展示如何在基于Spring Integration的应用程序中使用PostgreSQL的NOTIFY/LISTEN特性实现消息推送机制。这种轻量级通知方案特别适合需要实时事件触发的分布式系统场景。

2. 快速回顾

PostgreSQL提供了一种轻量级消息通知机制,允许客户端通过常规数据库连接相互发送消息。该机制使用两个非标准SQL语句:NOTIFY(发送通知)和LISTEN(监听通知)。

⚠️ 重要提醒:我们假设读者已了解NOTIFY/LISTEN基础用法,本文重点在于如何将其实现为SubscribableChannel

3. 依赖项

只需两个核心依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.8</version>
</dependency>

最新版本可在Maven中央仓库获取。

4. 什么是SubscribableChannel?

Spring Integration的SubscribableChannel是支持异步消息传递的MessageChannel扩展接口,核心特性包括:

  • subscribe(MessageHandler):注册消息处理器
  • unsubscribe(MessageHandler):取消注册
  • 与响应式编程的区别:这是推送模型(push-based),而非拉取模型(pull-based)
  • ⚠️ 流量控制缺失:消费者需自行实现缓冲/丢弃策略应对高并发

Spring Integration内置的PublishSubscribeChannel实现仅限于单JVM实例,而PostgreSQL方案可突破此限制。

5. SubscribableChannel实现

基于AbstractSubscribableChannel实现,优势包括:

  • 管理功能:暴露监控指标
  • 拦截器支持:允许添加ChannelInterceptors

5.1. 消息发送

实现doSend()方法,通过PostgreSQL的NOTIFY发送消息:

@Override
protected boolean doSend(Message<?> message, long timeout) {
    try {
        String msg = prepareNotifyPayload(message);
        try (Connection c = ds.getConnection()) {
            c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'");
        }
        return true;
    } catch (Exception ex) {
        throw new MessageDeliveryException(message, "无法投递消息: " + ex.getMessage(), ex);
    }
}

消息序列化方法:

protected String prepareNotifyPayload(Message<?> message) throws JsonProcessingException {
    Map<String, Object> rawMap = new HashMap<>();
    rawMap.putAll(message.getHeaders());
    JsonNode headerData = om.valueToTree(rawMap);
    JsonNode bodyData = om.valueToTree(message.getPayload());
    ObjectNode msg = om.getNodeFactory().objectNode();
    msg.set(HEADER_FIELD, headerData);
    msg.set(BODY_FIELD, bodyData);
    return om.writeValueAsString(msg);
}

⚠️ 踩坑提示:PostgreSQL默认限制通知载荷大小约8KB。大消息应改用共享存储(如S3/数据库表)+引用传递。

5.2. 消息分发

动态启停监听线程:

@Override
public boolean subscribe(MessageHandler handler) {
    boolean r = super.subscribe(handler);
    if (r && super.getSubscriberCount() == 1) {
        startListenerThread();
    }
    return r;
}

@Override
public boolean unsubscribe(MessageHandler handle) {
    boolean r = super.unsubscribe(handle);
    if (r && super.getSubscriberCount() == 0) {
        stopListenerThread();
    }
    return r;
}

监听线程核心逻辑:

@Override
public void run() {
    startLatch.countDown();
    try (Statement st = conn.createStatement()) {
        st.execute("LISTEN " + channelName);
        PGConnection pgConn = conn.unwrap(PGConnection.class);

        while (!Thread.currentThread().isInterrupted()) {
            PGNotification[] nts = pgConn.getNotifications();
            for (PGNotification n : nts) {
                Message<?> msg = convertNotification(n);
                getDispatcher().dispatch(msg); // 处理拦截器/指标等
            }
        }
    } catch (SQLException sex) {
        // 异常处理省略
    } finally {
        stopLatch.countDown();
    }
}

6. 集成示例

实现订单处理场景:发送BUY/SELL订单,接收端维护交易余额。

6.1. 配置通道

@Bean
static SubscribableChannel orders(@Value("${db.url}") String url, 
                                @Value("${db.username}") String username, 
                                @Value("${db.password}") String password) {
    
    SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true);      
    Supplier<Connection> connectionSupplier = () -> {
        try {
            return ds.getConnection();
        } catch(SQLException ex) {
            throw new RuntimeException(ex);
        }
    };
    
    PGSimpleDataSource pgds = new PGSimpleDataSource();
    pgds.setUrl(url);
    pgds.setUser(username);
    pgds.setPassword(password);
    
    return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper());
}

关键点

  • 使用两个DataSourceSingleConnectionDataSource用于监听,原生PGSimpleDataSource用于发送
  • 监听连接需保持长连接

6.2. 订单处理器

@ServiceActivator(inputChannel = "orderProcessor")
void processOrder(Order order){
    BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice());
    if (order.getOrderType() == OrderType.SELL) {
        orderTotal = orderTotal.negate();
    }
    
    BigDecimal sum = orderSummary.get(order.getSymbol());
    if (sum == null) {
        sum = orderTotal;
    } else {
        sum = sum.add(orderTotal);
    }    
    orderSummary.put(order.getSymbol(), sum);
    orderSemaphore.release(); // 测试同步用
}

6.3. 消息转换器

@Transformer(inputChannel = "orders", outputChannel = "orderProcessor" )
Order validatedOrders(Message<?> orderMessage) throws JsonProcessingException {
    ObjectNode on = (ObjectNode)orderMessage.getPayload();
    Order order = om.treeToValue(on, Order.class);
    return order;
}

注意:未显式定义orderProcessor时,Spring Integration会自动创建DirectChannel。高并发场景建议改用QueueChannel缓冲。

7. 测试

集成测试验证端到端流程:

@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class})
public class PostgresqlPubSubExampleLiveTest {
    
    @Autowired
    PostgresqlPubSubExample processor;
    
    @Autowired
    OrdersGateway ordersGateway;

    @Test
    void whenPublishOrder_thenSuccess() throws Exception {
        Order o = new Order(1L,"BAEL", OrderType.BUY, 
                          BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0));
        ordersGateway.publish(o);
        
        assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue();
        
        BigDecimal total = processor.getTotalBySymbol("BAEL");
        assertThat(total).isEqualTo(BigDecimal.valueOf(10));
    }
}

测试要求

  • 需运行中的PostgreSQL实例
  • 有效数据库凭证

8. Spring Integration 6用户注意

从v6开始,Spring Integration内置了PostgresSubscribableChannel实现,但要求:

  • ✅ Java 17+ 基线
  • ✅ 突破8KB载荷限制(需创建数据库表存储大消息)
  • ❌ 不兼容Java 8/11

对于存量Java 8/11项目,本文方案仍具实用价值。

9. 总结

本文展示了如何利用PostgreSQL的NOTIFY/LISTEN机制在Spring Integration中实现异步消息传递。核心优势包括:

  • 跨JVM的消息分发能力
  • 基于数据库的轻量级实现
  • 与Spring Integration生态无缝集成

完整代码见GitHub仓库


原始标题:Receiving PostreSQL Push Notifications with Spring Integration