PostgreSQL Pub / Sub functionality and Java client implementation

Future Advent Calendar 2019 This is the article on the 18th day.


When adopting the Pub / Sub type messaging architecture, I think that there are many cases where broker middleware such as kafka and managed services such as Amazon SNS and Google Cloud Pub / Sub are used, but in fact Pub / Sub is also used in PostgreSQL. I can do it.

If you are already using PostgreSQL for your business, you can easily realize loosely coupled system-to-system communication without having to build a new Pub / Sub broker.

This article introduces this feature and presents options and considerations when implementing a Pub / Sub client in Java.

NOTIFY/LISTEN

There are three queries related to the Pub / Sub feature of PostgreSQL:

Let's look at the basic usage and behavior.

channel"foo"Start Subscribe


LISTEN foo;

channel"foo"To"hello"Publish the data


NOTIFY foo, 'hello';
--Or
SELECT pg_notify('foo', 'hello');

-- "foo"The following notifications will be sent to sessions that have already been subscribed to
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.

Notification without payload is also possible


NOTIFY foo;

-- Asynchronous notification "foo" received from server process with PID 14728.

channel"foo"Exit Subscribe


UNLISTEN foo;

It's very simple. Next, we will list the main specifications of this function and show the points to consider when using it. For details, please see the Official Document.

channel

NOTIFY Hello, 'world';

-- Asynchronous notification "Hello" with payload "world" received from server process with PID 14728.
```

scope

pgsql_notify_listen_scope.png

Payload data type / size

transaction

Collected notifications


BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;

-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.

Storage size of outstanding messages

Java-based Pub / Sub client implementation

We will introduce three types of patterns when implementing Pub / Sub communication described so far in Java.

PostgreSQL JDBC driver

This is an implementation example using the JDBC driver of the PostgreSQL original family (the implementation example of the original family is here). When using Maven, add the following dependency.

pom.xml


<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.8</version>
</dependency>
//Create a connection for LISTEN in advance
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);

/**
 *Start receiving notifications.
 *
 * @param channel channel
 */
private void startListen(final String channel) {
    try {
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
            while (true) {
                var notifications = pgconn.getNotifications(10 * 1000);
                if (this.terminated) {
                    return;
                }
                if (notifications == null) {
                    continue;
                }
                for (var n : notifications) {
                    LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
                }
            }
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

/**
 *Notifies the PostgreSQL server.
 *
 * @param channel channel
 * @param payload message payload
 */
private void notify(final String channel, final String payload) {
    try {
        var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
        var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
        try (conn; pstmt) {
            pstmt.setString(1, channel);
            pstmt.setString(2, payload);
            pstmt.execute();
        }
        LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
    } catch (SQLException e) {
        LOG.error("exception thrown", e);
    }
}

PGJDBC-NG

pom.xml


<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.3</version>
</dependency>
//Create a connection for LISTEN in advance
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);

/**
 *Start receiving notifications.
 *
 * @param channel channel
 */
private void startListen(final String channel) {
    try {
        this.listenerConn.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(final int pid, final String channel, final String payload) {
                LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
            }
        });
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

// notify()Is similar to the PostgreSQL JDBC driver

As you can see, here you can implement the behavior when receiving a notification in the form of an event listener. It is also possible to register a listener by specifying a channel.

R2DBC

R2DBC is a newly developed JDBC driver from the perspective of reactive programming. Fully compliant with Reactive Streams, it claims to be completely non-blocking I / O.

pom.xml


<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
//Set up a connection for sending and receiving in advance
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;

var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
        .host("...")
        .port(5432)
        .username("...")
        .password("...")
        .database("...")
        .build());
this.receiver = connFactory.create();
this.sender = connFactory.create();

/**
 *Start receiving notifications.
 *
 * @param channel channel
 */
private void startListen(final String channel) {
    this.receiver.map(pgconn -> {
        return pgconn.createStatement("LISTEN " + channel)
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .thenMany(pgconn.getNotifications())
                .doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
                .doOnSubscribe(s -> LOG.info("listen start"))
                .subscribe();
    }).subscribe();
}

/**
 *Notifies the PostgreSQL server.
 *
 * @param channel channel
 * @param payload message payload
 */
private void notify(final String channel, final String payload) {
    this.sender.map(pgconn -> {
        return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .then()
                .doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
                .subscribe();
    }).subscribe();
}

When using R2DBC, you will use the API of the dependent Project Reactor entirely. This time I will only briefly explain, but I will build a series of flow of executing a query, handling the result, setting ancillary actions that move at a specified timing, and finally this flow will start moving You are calling subscribe (). The action when the notification arrives with doOnNext () is set, and the action when subscribed with doOnSubscribe () (timing to execute the query) is set, and here the log is simply output. At first glance, I was disappointed with the feeling of creating asynchronous stream processing in a style similar to Java's Stream API, but this page was a great learning experience.

in conclusion

PostgreSQL NOTIFY / LISTEN is Release 9.0, and the storage destination of the wait state event is the memory from the conventional system table. The ability to send payloads with notifications instead of queues has improved performance and convenience, and is now in its current form. It seems that the function itself has been installed for a long time, but since it has not been published in Qiita so far, I tried to publish it also as an information organization obtained by in-house technical verification.

By the way, R2DBC is still under active development and I have been watching it since this year, but since it came to support the NOTIFY / LISTEN function in September this year, I covered it a little in this article. I will write an article featuring R2DBC soon.

Recommended Posts

PostgreSQL Pub / Sub functionality and Java client implementation
Java 15 implementation and VS Code preferences
BloomFilter description and implementation sample (JAVA)
[ev3 × Java] Interface, implementation and inheritance (event handling)
[Mac / Java / Eclipse / PostgreSQL] Connect Java application and database
Java and Swift comparison (3) Class implementation / Class inheritance / Class design
Java mqtt client
Java and JavaScript
XXE and Java
About synchronized and Reentrant Lock (Java & Kotlin implementation example)