1. 简介
本教程将介绍如何利用 PostgreSQL 的 LISTEN/NOTIFY
命令实现一个简单的消息代理机制。
2. PostgreSQL 的 LISTEN/NOTIFY
机制快速入门
简单来说,这些命令允许连接的客户端通过常规 PostgreSQL 连接交换消息。客户端使用 NOTIFY
命令 向通道发送通知,并可附带一个可选的字符串载荷。
通道可以是任何有效的 SQL 标识符,其作用类似于传统消息系统中的主题。这意味着载荷将发送给该特定通道的所有活跃监听器。当没有载荷时,监听器只会收到一个空通知。
客户端使用 LISTEN
命令 开始接收通知,该命令将通道名称作为唯一参数。该命令会立即返回,允许客户端使用同一连接继续处理其他任务。
通知机制具有以下重要特性:
✅ 通道名在数据库内是唯一的
✅ 客户端无需特殊权限即可使用 LISTEN/NOTIFY
✅ 当 NOTIFY
在事务中使用时,只有事务成功提交后客户端才会收到通知
⚠️ 如果在一个事务中多次向同一通道发送相同载荷的 NOTIFY
命令,客户端只会收到一条通知。
3. 选择 PostgreSQL 作为消息代理的场景
考虑到 PostgreSQL 通知机制的特性,我们可能会思考:何时选择它而不是 RabbitMQ 等成熟的消息代理?这需要权衡取舍。通常选择后者意味着:
❌ 增加系统复杂度——消息代理是需要监控、升级的额外组件
❌ 需要处理分布式事务带来的故障模式
而通知机制则不存在这些问题:
✅ 功能已内置(假设我们已使用 PostgreSQL 作为主数据库)
✅ 无需处理分布式事务
当然,它也有局限性:
❌ 属于 PostgreSQL 特有机制,可能导致技术锁定
❌ 不直接支持持久化订阅者——客户端开始监听前发送的通知会丢失
尽管有这些限制,该机制仍有潜在应用场景:
✅ "模块化单体"应用中的通知总线
✅ 分布式缓存失效
✅ 轻量级消息代理(使用普通数据库表作为队列)
✅ 事件溯源架构
4. 在 Spring Boot 应用中使用 LISTEN/NOTIFY
现在我们已基本理解 LISTEN/NOTIFY
机制,接下来构建一个使用该机制的 Spring Boot 测试应用。我们将创建一个简单的 API 用于提交买卖订单,载荷包含交易品种符号、价格和数量。同时添加一个根据订单 ID 查询订单的 API。
到目前为止没什么特别之处。但关键需求是:我们希望在订单插入数据库后立即从缓存中提供查询服务。虽然可以使用缓存直写,但在需要扩展服务的分布式场景中,我们还需要分布式缓存。
这时通知机制就派上用场了:每次插入订单时发送 NOTIFY
,客户端通过 LISTEN
预加载订单到各自的本地缓存。
4.1. 项目依赖
我们的示例应用需要常规的 WebMVC Spring Boot 依赖,以及 PostgreSQL 驱动:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
最新版 spring-boot-starter-web、spring-boot-starter-data-jdbc 和 postgresql 可在 Maven Central 获取。
4.2. 通知服务
由于通知机制是 PostgreSQL 特有的,我们将其通用行为封装在 NotifierService
类中:这样能避免这些细节泄露到应用其他部分。这也简化了单元测试——我们可以用模拟版本替换此服务以实现不同测试场景。
NotifierService
有两个职责。首先,它提供发送订单相关通知的门面:
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
@Transactional
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
}
// ... 其他方法省略
}
其次,它提供创建 Runnable
实例的工厂方法,应用通过这些实例接收通知。该工厂接收一个 PGNotification
对象的 Consumer
,该对象包含获取通知通道和载荷的方法:
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
continue;
}
for( PGNotification nt : nts) {
consumer.accept(nt);
}
}
return 0;
});
};
}
这里为简单起见直接传递原始 PGNotification
。在实际场景中(通常涉及多个领域实体),我们可以使用泛型等技术扩展此类以避免代码重复。
关于创建的 Runnable
的几点说明:
✅ 数据库相关逻辑使用提供的 JdbcTemplate
的 execute()
方法,确保连接正确处理/清理并简化错误处理
✅ 回调会持续运行,直到当前线程被中断或运行时错误导致返回
注意这里使用 PGConnection
而非标准 JDBC Connection
——我们需要直接访问 getNotifications()
方法来获取一个或多个排队通知。
getNotifications()
有两种变体:
- 无参版本:轮询待处理通知并返回,无通知时返回 null
- 带整型参数版本:指定最大等待时间(毫秒),超时返回 null
- 传 0 时:阻塞直到新通知到达
应用初始化时,我们在 @Configuration
类中使用 CommandLineRunner
bean 来启动新线程实际接收通知:
@Configuration
public class ListenerConfiguration {
@Bean
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
t.start();
};
}
}
4.3. 连接处理
虽然技术上可行,但使用同一连接处理通知和常规查询并不方便。这需要在控制流中散布 getNotification()
调用,导致代码难以阅读和维护。
标准做法是运行一个或多个专用线程处理通知。每个线程有自己的连接并保持常开。如果这些连接由 Hikari 或 DBCP 等连接池创建,可能会引发问题。
为避免这些问题,我们的示例创建了一个专用的 DriverDataSource
,并用它创建 NotifierService
所需的 JdbcTemplate
:
@Configuration
public class NotifierConfiguration {
@Bean
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
props.determineUrl(),
props.determineDriverClassName(),
new Properties(),
props.determineUsername(),
props.determinePassword());
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
}
}
注意我们复用了创建主 Spring 管理 DataSource
的相同连接属性。但我们未将此专用 DataSource
暴露为 bean,否则会禁用 Spring Boot 的自动配置功能。
4.4. 通知处理器
缓存逻辑的最后一块是 NotificationHandler
类,它实现了 Consumer<Notification>
接口。该类的职责是处理单个通知,并用 Order
实例填充配置的 Cache
:
@Component
public class NotificationHandler implements Consumer<PGNotification> {
private final OrdersService orders;
@Override
public void accept(PGNotification t) {
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
// ... 日志消息省略
}
}
实现使用 getName()
和 getParameter()
从通知中获取通道名和订单 ID。这里我们可以假定通知总是符合预期——这并非偷懒,而是源于 NotifierService
构造 Runnable
的方式。
实际逻辑很简单:使用 OrderRepository
从数据库获取 Order
并添加到缓存:
@Service
public class OrdersService {
private final OrdersRepository repo;
// ... 其他私有字段省略
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if (!o.isEmpty()) {
log.info("findById: cache hit, id={}",id);
return o;
}
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
}
ordersCache.put(id, o.get());
return o;
}
}
5. 测试
要观察通知机制的实际效果,最佳方式是启动两个或更多测试应用实例,每个实例配置监听不同端口。还需要一个两个实例都能连接的 PostgreSQL 实例。请参考 application.properties
文件,修改其中的 PostgreSQL 连接详情。
接下来,在两个 shell 中使用 Maven 启动应用。项目的 pom.xml
包含额外配置文件 instance1
,用于在不同端口启动应用:
# 第一个 shell:
$ mvn spring-boot:run
... 大量日志(省略)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 2.615 seconds (JVM running for 2.944)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
## 第二个 shell
... 大量日志(省略)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 1.984 seconds (JVM running for 2.274)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
稍后,每个实例都会显示应用已准备接收请求的日志。现在在第三个 shell 中使用 curl
创建第一个 Order
:
$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}
运行在 8080 端口的应用实例会输出一些日志。我们还会看到 8081 实例的日志显示它收到了通知:
[ order-listener] c.b.m.p.service.NotificationHandler : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)
这证明机制按预期工作。最后,我们再次使用 curl
在 instance1
上查询创建的订单:
curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}
如预期般获得订单详情。而且应用日志显示该信息来自缓存:
[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService : findById: cache hit, id=30
6. 结论
本文介绍了 PostgreSQL 的 NOTIFY/LISTEN
机制,以及如何用它实现无需额外组件的轻量级消息代理。
所有代码可在 GitHub 获取。