1. 简介

本教程将介绍如何利用 PostgreSQL 的 LISTEN/NOTIFY 命令实现一个简单的消息代理机制。

2. PostgreSQL 的 LISTEN/NOTIFY 机制快速入门

简单来说,这些命令允许连接的客户端通过常规 PostgreSQL 连接交换消息。客户端使用 NOTIFY 命令通道发送通知,并可附带一个可选的字符串载荷。

通道可以是任何有效的 SQL 标识符,其作用类似于传统消息系统中的主题。这意味着载荷将发送给该特定通道的所有活跃监听器。当没有载荷时,监听器只会收到一个空通知。

客户端使用 LISTEN 命令 开始接收通知,该命令将通道名称作为唯一参数。该命令会立即返回,允许客户端使用同一连接继续处理其他任务。

通知机制具有以下重要特性:

✅ 通道名在数据库内是唯一的
✅ 客户端无需特殊权限即可使用 LISTEN/NOTIFY
✅ 当 NOTIFY 在事务中使用时,只有事务成功提交后客户端才会收到通知

⚠️ 如果在一个事务中多次向同一通道发送相同载荷的 NOTIFY 命令,客户端只会收到一条通知。

3. 选择 PostgreSQL 作为消息代理的场景

考虑到 PostgreSQL 通知机制的特性,我们可能会思考:何时选择它而不是 RabbitMQ 等成熟的消息代理?这需要权衡取舍。通常选择后者意味着:

❌ 增加系统复杂度——消息代理是需要监控、升级的额外组件
❌ 需要处理分布式事务带来的故障模式

而通知机制则不存在这些问题:

✅ 功能已内置(假设我们已使用 PostgreSQL 作为主数据库)
✅ 无需处理分布式事务

当然,它也有局限性:

❌ 属于 PostgreSQL 特有机制,可能导致技术锁定
❌ 不直接支持持久化订阅者——客户端开始监听前发送的通知会丢失

尽管有这些限制,该机制仍有潜在应用场景:

✅ "模块化单体"应用中的通知总线
✅ 分布式缓存失效
✅ 轻量级消息代理(使用普通数据库表作为队列)
✅ 事件溯源架构

4. 在 Spring Boot 应用中使用 LISTEN/NOTIFY

现在我们已基本理解 LISTEN/NOTIFY 机制,接下来构建一个使用该机制的 Spring Boot 测试应用。我们将创建一个简单的 API 用于提交买卖订单,载荷包含交易品种符号、价格和数量。同时添加一个根据订单 ID 查询订单的 API。

到目前为止没什么特别之处。但关键需求是:我们希望在订单插入数据库后立即从缓存中提供查询服务。虽然可以使用缓存直写,但在需要扩展服务的分布式场景中,我们还需要分布式缓存。

这时通知机制就派上用场了:每次插入订单时发送 NOTIFY,客户端通过 LISTEN 预加载订单到各自的本地缓存。

4.1. 项目依赖

我们的示例应用需要常规的 WebMVC Spring Boot 依赖,以及 PostgreSQL 驱动:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.6.0</version>
</dependency>

最新版 spring-boot-starter-webspring-boot-starter-data-jdbcpostgresql 可在 Maven Central 获取。

4.2. 通知服务

由于通知机制是 PostgreSQL 特有的,我们将其通用行为封装在 NotifierService 类中:这样能避免这些细节泄露到应用其他部分。这也简化了单元测试——我们可以用模拟版本替换此服务以实现不同测试场景。

NotifierService 有两个职责。首先,它提供发送订单相关通知的门面:

public class NotifierService {
    private static final String ORDERS_CHANNEL = "orders";
    private final JdbcTemplate tpl;
   
    @Transactional
    public void notifyOrderCreated(Order order) {
        tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
    }
   // ... 其他方法省略
}

其次,它提供创建 Runnable 实例的工厂方法,应用通过这些实例接收通知。该工厂接收一个 PGNotification 对象的 Consumer,该对象包含获取通知通道和载荷的方法:

public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {        
    return () -> {
        tpl.execute((Connection c) -> {
            c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);                
            PGConnection pgconn = c.unwrap(PGConnection.class);                 
            while(!Thread.currentThread().isInterrupted()) {
                PGNotification[] nts = pgconn.getNotifications(10000);
                if ( nts == null || nts.length == 0 ) {
                    continue;
                }                    
                for( PGNotification nt : nts) {
                    consumer.accept(nt);
                }
            }                
            return 0;
        });                
    };
}

这里为简单起见直接传递原始 PGNotification。在实际场景中(通常涉及多个领域实体),我们可以使用泛型等技术扩展此类以避免代码重复。

关于创建的 Runnable 的几点说明:

✅ 数据库相关逻辑使用提供的 JdbcTemplateexecute() 方法,确保连接正确处理/清理并简化错误处理
✅ 回调会持续运行,直到当前线程被中断或运行时错误导致返回

注意这里使用 PGConnection 而非标准 JDBC Connection——我们需要直接访问 getNotifications() 方法来获取一个或多个排队通知。

getNotifications() 有两种变体:

  • 无参版本:轮询待处理通知并返回,无通知时返回 null
  • 带整型参数版本:指定最大等待时间(毫秒),超时返回 null
  • 传 0 时:阻塞直到新通知到达

应用初始化时,我们在 @Configuration 类中使用 CommandLineRunner bean 来启动新线程实际接收通知:

@Configuration
public class ListenerConfiguration {
    
    @Bean
    CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
        return (args) -> {
            Runnable listener = notifier.createNotificationHandler(handler);            
            Thread t = new Thread(listener, "order-listener");
            t.start();
        };
    }
}

4.3. 连接处理

虽然技术上可行,但使用同一连接处理通知和常规查询并不方便。这需要在控制流中散布 getNotification() 调用,导致代码难以阅读和维护。

标准做法是运行一个或多个专用线程处理通知。每个线程有自己的连接并保持常开。如果这些连接由 Hikari 或 DBCP 等连接池创建,可能会引发问题

为避免这些问题,我们的示例创建了一个专用的 DriverDataSource,并用它创建 NotifierService 所需的 JdbcTemplate

@Configuration
public class NotifierConfiguration {

    @Bean
    NotifierService notifier(DataSourceProperties props) {
        
        DriverDataSource ds = new DriverDataSource(
          props.determineUrl(), 
          props.determineDriverClassName(),
          new Properties(), 
          props.determineUsername(),
          props.determinePassword());
        
        JdbcTemplate tpl = new JdbcTemplate(ds);
        return new NotifierService(tpl);
    }
}

注意我们复用了创建主 Spring 管理 DataSource 的相同连接属性。但我们未将此专用 DataSource 暴露为 bean,否则会禁用 Spring Boot 的自动配置功能。

4.4. 通知处理器

缓存逻辑的最后一块是 NotificationHandler 类,它实现了 Consumer<Notification> 接口。该类的职责是处理单个通知,并用 Order 实例填充配置的 Cache

@Component
public class NotificationHandler implements Consumer<PGNotification> {
    private final OrdersService orders;

    @Override
    public void accept(PGNotification t) {
        Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
        // ... 日志消息省略
    }
}

实现使用 getName()getParameter() 从通知中获取通道名和订单 ID。这里我们可以假定通知总是符合预期——这并非偷懒,而是源于 NotifierService 构造 Runnable 的方式。

实际逻辑很简单:使用 OrderRepository 从数据库获取 Order 并添加到缓存:

@Service
public class OrdersService {
    private final OrdersRepository repo;
    // ... 其他私有字段省略
   
    @Transactional(readOnly = true)
    public Optional<Order> findById(Long id) {
        Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
        if (!o.isEmpty()) {
            log.info("findById: cache hit, id={}",id);
            return o;
        }        
        log.info("findById: cache miss, id={}",id);
        o = repo.findById(id);
        if ( o.isEmpty()) {
            return o;
        }        
        ordersCache.put(id, o.get());
        return o;
    }
}

5. 测试

要观察通知机制的实际效果,最佳方式是启动两个或更多测试应用实例,每个实例配置监听不同端口。还需要一个两个实例都能连接的 PostgreSQL 实例。请参考 application.properties 文件,修改其中的 PostgreSQL 连接详情。

接下来,在两个 shell 中使用 Maven 启动应用。项目的 pom.xml 包含额外配置文件 instance1,用于在不同端口启动应用:

# 第一个 shell:
$ mvn spring-boot:run
... 大量日志(省略)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 2.615 seconds (JVM running for 2.944)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

## 第二个 shell
... 大量日志(省略)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 1.984 seconds (JVM running for 2.274)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

稍后,每个实例都会显示应用已准备接收请求的日志。现在在第三个 shell 中使用 curl 创建第一个 Order

$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}

运行在 8080 端口的应用实例会输出一些日志。我们还会看到 8081 实例的日志显示它收到了通知

[ order-listener] c.b.m.p.service.NotificationHandler    : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler    : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)

这证明机制按预期工作。最后,我们再次使用 curlinstance1 上查询创建的订单:

curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}

如预期般获得订单详情。而且应用日志显示该信息来自缓存

[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService   : findById: cache hit, id=30

6. 结论

本文介绍了 PostgreSQL 的 NOTIFY/LISTEN 机制,以及如何用它实现无需额外组件的轻量级消息代理。

所有代码可在 GitHub 获取。


原始标题:Using PostgreSQL as a Message Broker