PostgreSQL Pub / Sub-Funktion und Java-Client-Implementierung

Zukünftiger Adventskalender 2019 Dies ist der Artikel am 18. Tag.


Bei der Übernahme der Messaging-Architektur vom Typ Pub / Sub gibt es meines Erachtens viele Fälle, in denen Broker-Middleware wie Kafka und verwaltete Dienste wie Amazon SNS und Google Cloud Pub / Sub verwendet werden. Tatsächlich wird Pub / Sub jedoch auch in PostgreSQL verwendet. Ich kann es schaffen

Wenn Sie PostgreSQL bereits für Ihr Unternehmen verwenden, können Sie problemlos eine lose gekoppelte systemübergreifende Kommunikation realisieren, ohne einen neuen Pub / Sub-Broker zu erstellen.

In diesem Artikel wird diese Funktion vorgestellt und es werden Optionen und Überlegungen zur Implementierung des Pub / Sub-Clients in Java vorgestellt.

NOTIFY/LISTEN

Es gibt drei Abfragen im Zusammenhang mit der Pub / Sub-Funktion von PostgreSQL:

Schauen wir uns die grundlegende Verwendung und das Verhalten an.

Kanal"foo"Abonnieren starten


LISTEN foo;

Kanal"foo"Zu"hello"Veröffentlichen Sie die Daten


NOTIFY foo, 'hello';
--Oder
SELECT pg_notify('foo', 'hello');

-- "foo"Die folgende Benachrichtigung wird an die abonnierte Sitzung gesendet
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.

Eine Benachrichtigung ohne Nutzlast ist ebenfalls möglich


NOTIFY foo;

-- Asynchronous notification "foo" received from server process with PID 14728.

Kanal"foo"Beenden Abonnieren


UNLISTEN foo;

Es ist sehr einfach. Als nächstes werden die wichtigsten Spezifikationen dieser Funktion aufgelistet und Punkte angezeigt, die bei der Verwendung zu berücksichtigen sind. Weitere Informationen finden Sie unter Offizielles Dokument.

Kanal

Benachrichtigen Hallo, 'Welt';

-- Asynchronous notification "Hallo" with payload "Welt" received from server process with PID 14728.
```

Umfang

pgsql_notify_listen_scope.png

Datentyp und Größe der Nutzlast

Transaktion

Gesammelte Benachrichtigungen


BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;

-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.

Speichergröße ausstehender Nachrichten

Pub / Sub-Client-Implementierung in Java

Hier sind drei Arten von Mustern für die Implementierung der Pub / Sub-Kommunikation, die bisher in Java beschrieben wurden.

PostgreSQL JDBC-Treiber

Dies ist ein Implementierungsbeispiel mit dem ursprünglichen JDBC-Treiber von PostgreSQL (das ursprüngliche Implementierungsbeispiel ist hier). Fügen Sie bei Verwendung von Maven die folgenden Abhängigkeiten hinzu.

pom.xml


<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.8</version>
</dependency>
//Erstellen Sie im Voraus eine Verbindung für LISTEN
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);

/**
 *Empfangen Sie Benachrichtigungen.
 *
 * @Parameter Kanal Kanal
 */
private void startListen(final String channel) {
    try {
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
            while (true) {
                var notifications = pgconn.getNotifications(10 * 1000);
                if (this.terminated) {
                    return;
                }
                if (notifications == null) {
                    continue;
                }
                for (var n : notifications) {
                    LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
                }
            }
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

/**
 *Benachrichtigt den PostgreSQL-Server.
 *
 * @Parameter Kanal Kanal
 * @param payload Nachrichtennutzlast
 */
private void notify(final String channel, final String payload) {
    try {
        var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
        var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
        try (conn; pstmt) {
            pstmt.setString(1, channel);
            pstmt.setString(2, payload);
            pstmt.execute();
        }
        LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
    } catch (SQLException e) {
        LOG.error("exception thrown", e);
    }
}

PGJDBC-NG

pom.xml


<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.3</version>
</dependency>
//Erstellen Sie im Voraus eine Verbindung für LISTEN
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);

/**
 *Empfangen Sie Benachrichtigungen.
 *
 * @Parameter Kanal Kanal
 */
private void startListen(final String channel) {
    try {
        this.listenerConn.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(final int pid, final String channel, final String payload) {
                LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
            }
        });
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

// notify()Ist dem PostgreSQL JDBC-Treiber ähnlich

Wie Sie sehen, können Sie hier das Verhalten beim Empfang von Benachrichtigungen in Form eines Ereignis-Listeners implementieren. Es ist auch möglich, einen Listener durch Angabe eines Kanals zu registrieren.

R2DBC

R2DBC ist ein neu entwickelter JDBC-Treiber aus Sicht der reaktiven Programmierung. I / O ist vollständig mit Reactive Streams kompatibel und behauptet, vollständig nicht blockierend zu sein.

pom.xml


<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
//Richten Sie im Voraus eine Verbindung zum Senden und Empfangen ein
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;

var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
        .host("...")
        .port(5432)
        .username("...")
        .password("...")
        .database("...")
        .build());
this.receiver = connFactory.create();
this.sender = connFactory.create();

/**
 *Empfangen Sie Benachrichtigungen.
 *
 * @Parameter Kanal Kanal
 */
private void startListen(final String channel) {
    this.receiver.map(pgconn -> {
        return pgconn.createStatement("LISTEN " + channel)
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .thenMany(pgconn.getNotifications())
                .doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
                .doOnSubscribe(s -> LOG.info("listen start"))
                .subscribe();
    }).subscribe();
}

/**
 *Benachrichtigt den PostgreSQL-Server.
 *
 * @Parameter Kanal Kanal
 * @param payload Nachrichtennutzlast
 */
private void notify(final String channel, final String payload) {
    this.sender.map(pgconn -> {
        return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .then()
                .doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
                .subscribe();
    }).subscribe();
}

Bei Verwendung von R2DBC verwenden Sie die API des abhängigen Project Reactor vollständig. Dieses Mal werde ich nur kurz erklären, aber ich werde eine Reihe von Abläufen erstellen, in denen eine Abfrage ausgeführt, das Ergebnis verarbeitet, zusätzliche Aktionen festgelegt werden, die sich zu einem bestimmten Zeitpunkt bewegen, und schließlich beginnt sich dieser Ablauf zu bewegen Sie rufen subscribe () `auf. Die Aktion, wenn die Benachrichtigung mit "doOnNext ()" eintrifft, wird festgelegt, und die Aktion, wenn sie mit "doOnSubscribe ()" (Zeitpunkt für die Ausführung der Abfrage) abonniert ist, wird festgelegt. Hier wird das Protokoll einfach ausgegeben. Auf den ersten Blick war ich enttäuscht von dem Gefühl, eine asynchrone Stream-Verarbeitung in einem ähnlichen Stil wie die Stream-API von Java zu erstellen, aber diese Seite war eine großartige Lernerfahrung.

abschließend

NOTIFY / LISTEN von PostgreSQL ist Release 9.0, und das Speicherziel des Wartezustandsereignisses ist der Speicher aus der herkömmlichen Systemtabelle. Die Möglichkeit, Nutzdaten mit Benachrichtigungen anstelle von Warteschlangen zu senden, hat die Leistung und den Komfort verbessert und liegt nun in der aktuellen Form vor. Es scheint, dass die Funktion selbst schon lange installiert ist, aber da sie bisher nicht in Qiita veröffentlicht wurde, habe ich versucht, sie auch als Informationsorganisation zu veröffentlichen, die durch interne technische Überprüfung erhalten wurde.

Übrigens befindet sich R2DBC noch in der aktiven Entwicklung und ich beobachte es seit diesem Jahr, aber seit es im September dieses Jahres die NOTIFY / LISTEN-Funktion unterstützt, habe ich es in diesem Artikel ein wenig behandelt. Ich werde bald einen Artikel mit R2DBC schreiben.

Recommended Posts

PostgreSQL Pub / Sub-Funktion und Java-Client-Implementierung
BloomFilter-Beschreibungs- und Implementierungsbeispiel (JAVA)
[ev3 × Java] Schnittstelle, Implementierung und Vererbung (Ereignisverarbeitung)
[Mac / Java / Eclipse / PostgreSQL] Verbinden Sie die Java-Anwendung und die Datenbank
Java- und Swift-Vergleich (3) Klassenimplementierung / Klassenvererbung / Klassendesign
Java mqtt Client
Java und JavaScript
XXE und Java