1. 简介

本文将深入探讨 PostgreSQL 中的 LISTENNOTIFY 命令。我们将分析这两个核心机制的工作原理、使用场景,以及如何在 Java 应用中高效利用它们实现事件驱动通信。

2. 什么是 LISTEN 和 NOTIFY?

PostgreSQL 通过 LISTEN 和 NOTIFY 命令支持服务器与客户端间的异步通信。这些 PostgreSQL 特有的扩展允许我们将数据库作为轻量级消息系统使用,使数据库能够生成事件并通知客户端响应。这种机制在实时仪表盘、缓存失效、数据审计等场景中特别实用。

2.1 监听通知

使用 LISTEN 命令注册对特定事件的监听。该命令需要指定监听的通道名称:

postgres=# LISTEN my_channel;
LISTEN

执行后,当前连接就能接收该通道上的异步通知。

所有注册监听的连接都会收到通知,这本质上是一种广播机制而非单点投递。这意味着我们可以轻松地通知所有客户端数据库中发生的事件。

⚠️ 注意:在 psql 中不会自动显示通知。需要再次执行 LISTEN 命令才能查看自上次以来的所有通知:

postgres=# LISTEN my_channel;
LISTEN
.....
postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, World!" received from server process with PID 66.

这里我们看到某个连接触发了事件,监听连接收到了包含 "Hello, World!" 负载的通知。

✅ 监听器数量没有硬性限制,但每个监听器需保持数据库连接开启,因此最大连接数会成为实际限制。过多监听器可能消耗资源导致性能问题。

2.2 触发通知

了解如何监听事件后,我们还需要知道如何触发它们。使用 NOTIFY 命令触发事件,需要指定通道名和消息内容

postgres=# NOTIFY my_channel, 'Hello, World!';
NOTIFY

执行该命令后,所有执行过对应 LISTEN 命令的连接都能收到事件通知。

消息负载是可选的,但若省略或提供 NULL,系统会将其视为空字符串。负载最大限制为 8000 字节,超出将报错且不会通知任何监听器。

通知遵循事务规则:在事务中触发的通知会在事务提交时发送,若事务回滚则通知不会发送。

2.3 动态消息生成

NOTIFY 命令要求消息必须是静态字符串,无法动态生成(包括简单的字符串拼接):

postgres=# NOTIFY my_channel, 'Hello, ' || 'World';
ERROR:  syntax error at or near "||"
LINE 1: NOTIFY my_channel, 'Hello, ' || 'World!';

但我们可以使用 pg_notify 函数动态生成通知。该函数接受任意方式构建的消息:

postgres=# SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
 pg_notify
-----------

(1 row)

这里通道名和负载都作为字符串参数提供,我们可以通过 SQL 查询结果等方式动态构建这些字符串。

2.4 通过触发器触发事件

除了手动触发,我们还能让数据库自动触发事件。例如创建触发器函数在特定时机执行通知:

CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $$
    BEGIN
        PERFORM pg_notify('table_change', TG_TABLE_NAME);
        RETURN NEW;
    END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER table_change 
    AFTER INSERT OR UPDATE OR DELETE ON table_name
    FOR EACH ROW EXECUTE PROCEDURE notify_table_change();

设置后,当 table_name 表发生增删改操作时,触发器会自动在 table_change 通道发送包含表名的通知。

3. 使用 JDBC 触发通知

通过 JDBC 触发通知的方式与前述完全一致。首先添加 PostgreSQL JDBC 驱动依赖:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.7.6</version>
</dependency>

创建标准连接:

Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "mysecretpassword");

连接后可通过 NOTIFY 命令或 pg_notify() 函数触发通知

try (Statement statement = connection.createStatement()) {
    statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
}

若需要动态构建消息(如使用绑定参数),则必须使用 pg_notify:

try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
    statement.setString(1, "my_channel");
    statement.setString(2, "Hello, pg_notify!");
    statement.execute();
}

两种方式效果相同,通知会按预期送达:

postgres=# postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, NOTIFY!" received from server process with PID 390.
Asynchronous notification "my_channel" with payload "Hello, pg_notify!" received from server process with PID 390.

监听会话成功收到了 Java 代码触发的两条通知。

4. 使用官方 JDBC 驱动监听通知

虽然通过 JDBC 触发通知很简单,但监听通知则复杂得多。接收数据库异步消息并非 JDBC 规范的一部分,需要依赖驱动特定功能。

首先执行 LISTEN 语句:

try (Statement statement = connection.createStatement()) {
    statement.execute("LISTEN my_channel");
}

但实际接收通知需要使用原始 PGConnection 对象的 getNotifications() 方法。需先确保获取正确类型的连接:

PGConnection pgConnection = connection.unwrap(org.postgresql.PGConnection.class);

然后在循环中调用 getNotifications() 轮询数据库:

while (!Thread.currentThread().isInterrupted()) {
    PGNotification[] notifications = pgConnection.getNotifications(1000);
    if (notifications != null) {
        // 处理通知
    }
}

收到通知后可自定义处理逻辑,但需注意:下次调用 getNotifications() 前不会收到新通知,因此若需高效响应后续事件,必须保持循环运行。

getNotifications() 有三种调用方式:

  1. 无参数:立即返回当前待处理通知(不推荐)
  2. 带超时参数(毫秒):阻塞指定时间或直到有通知到达
    PGNotification[] notifications = pgConnection.getNotifications(100);
    
  3. 超时设为 0:永久阻塞直到有通知(适合专用线程)

5. 使用 PGJDBC-NG 监听通知

若希望避免轮询数据库接收通知,可使用 PGJDBC-NG 驱动。该驱动兼容 PostgreSQL 并提供高级特性,包括通知回调机制。

添加依赖:

<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.9</version>
</dependency>

创建连接时使用 jdbc:pgsql URL:

Connection connection = DriverManager.getConnection("jdbc:pgsql://localhost:5432/postgres", "postgres", "mysecretpassword");

仍需执行 LISTEN 命令,但这次可以注册监听器接收回调。实现 PGNotificationListener 接口:

class Listener implements PGNotificationListener {
    @Override
    public void notification(int processId, String channelName, String payload) {
        LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
                channelName, payload, processId);
    }
}

向连接注册监听器实例:

PGConnection pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
pgConnection.addNotificationListener(new Listener());

连接保持活跃期间,通知会自动触发回调无需轮询

10:34:03.104 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, NOTIFY!', PID=844
10:34:03.106 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, pg_notify!', PID=844

这种方式不仅更易管理,还避免了轮询带来的性能开销。

6. 总结

本文深入探讨了 PostgreSQL 的 LISTEN/NOTIFY 机制及其在 JDBC 中的应用。当您需要从数据库触发事件时,不妨试试这个简单粗暴的方案。所有示例代码可在 GitHub 获取。


原始标题:Event-Driven LISTEN/NOTIFY Support in Java using PostgreSQL | Baeldung