Zukünftiger Adventskalender 2019 Dies ist der Artikel am 18. Tag.
Bei der Übernahme der Messaging-Architektur vom Typ Pub / Sub gibt es meines Erachtens viele Fälle, in denen Broker-Middleware wie Kafka und verwaltete Dienste wie Amazon SNS und Google Cloud Pub / Sub verwendet werden. Tatsächlich wird Pub / Sub jedoch auch in PostgreSQL verwendet. Ich kann es schaffen
Wenn Sie PostgreSQL bereits für Ihr Unternehmen verwenden, können Sie problemlos eine lose gekoppelte systemübergreifende Kommunikation realisieren, ohne einen neuen Pub / Sub-Broker zu erstellen.
In diesem Artikel wird diese Funktion vorgestellt und es werden Optionen und Überlegungen zur Implementierung des Pub / Sub-Clients in Java vorgestellt.
NOTIFY/LISTEN
Es gibt drei Abfragen im Zusammenhang mit der Pub / Sub-Funktion von PostgreSQL:
NOTIFY channel [, payload]
Pg_notify
wird auch als Funktion mit derselben Funktion bereitgestellt.LISTEN channel
UNLISTEN {channel | *}
Schauen wir uns die grundlegende Verwendung und das Verhalten an.
Kanal"foo"Abonnieren starten
LISTEN foo;
Kanal"foo"Zu"hello"Veröffentlichen Sie die Daten
NOTIFY foo, 'hello';
--Oder
SELECT pg_notify('foo', 'hello');
-- "foo"Die folgende Benachrichtigung wird an die abonnierte Sitzung gesendet
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.
Eine Benachrichtigung ohne Nutzlast ist ebenfalls möglich
NOTIFY foo;
-- Asynchronous notification "foo" received from server process with PID 14728.
Kanal"foo"Beenden Abonnieren
UNLISTEN foo;
Es ist sehr einfach. Als nächstes werden die wichtigsten Spezifikationen dieser Funktion aufgelistet und Punkte angezeigt, die bei der Verwendung zu berücksichtigen sind. Weitere Informationen finden Sie unter Offizielles Dokument.
Channel ist eine beliebige Zeichenfolge, die ein Schlüssel für die Pub / Sub-Kommunikation ist. Daten können nicht ausgetauscht werden, wenn der für LISTEN bestimmte Kanal und der Kanal, der NOTIFY ausführt, unterschiedlich sind.
Sie können mehrere Kanäle in einer Sitzung anhören.
Für ASCII-Zeichen können alphanumerische Zeichen und Unterstriche (_) als Zeichen verwendet werden, die für den Kanal angegeben werden können. Es wird nicht zwischen Groß- und Kleinschreibung unterschieden. Wir haben bestätigt, dass auch Multi-Byte-Zeichen verwendet werden können.
Benachrichtigen Hallo, 'Welt';
-- Asynchronous notification "Hallo" with payload "Welt" received from server process with PID 14728.
```
Die Daten auf der Nutzlast sind nur Text und Binärdateien können nicht gesendet oder empfangen werden.
Wenn Sie Binärdaten einfügen möchten, müssen Sie diese mit der Funktion "encode" in das Textformat konvertieren oder mit der aufrufenden Anwendung in eine JSON-Zeichenfolge usw. serialisieren.
Das Limit für die Nutzlastgröße beträgt weniger als 8000 Byte. Wenn es überschritten wird, wird der folgende Fehler zurückgegeben.
ERROR: payload string too long
SQL state: 22023
Gesammelte Benachrichtigungen
BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;
-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.
pg_notification_queue_usage
(ausgedrückt als Bruch von 0 bis 1) überprüfen.Hier sind drei Arten von Mustern für die Implementierung der Pub / Sub-Kommunikation, die bisher in Java beschrieben wurden.
Dies ist ein Implementierungsbeispiel mit dem ursprünglichen JDBC-Treiber von PostgreSQL (das ursprüngliche Implementierungsbeispiel ist hier). Fügen Sie bei Verwendung von Maven die folgenden Abhängigkeiten hinzu.
pom.xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.8</version>
</dependency>
//Erstellen Sie im Voraus eine Verbindung für LISTEN
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
/**
*Empfangen Sie Benachrichtigungen.
*
* @Parameter Kanal Kanal
*/
private void startListen(final String channel) {
try {
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
while (true) {
var notifications = pgconn.getNotifications(10 * 1000);
if (this.terminated) {
return;
}
if (notifications == null) {
continue;
}
for (var n : notifications) {
LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
}
}
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
/**
*Benachrichtigt den PostgreSQL-Server.
*
* @Parameter Kanal Kanal
* @param payload Nachrichtennutzlast
*/
private void notify(final String channel, final String payload) {
try {
var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
try (conn; pstmt) {
pstmt.setString(1, channel);
pstmt.setString(2, payload);
pstmt.execute();
}
LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
} catch (SQLException e) {
LOG.error("exception thrown", e);
}
}
PGJDBC-NG
pom.xml
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.3</version>
</dependency>
//Erstellen Sie im Voraus eine Verbindung für LISTEN
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);
/**
*Empfangen Sie Benachrichtigungen.
*
* @Parameter Kanal Kanal
*/
private void startListen(final String channel) {
try {
this.listenerConn.addNotificationListener(new PGNotificationListener() {
@Override
public void notification(final int pid, final String channel, final String payload) {
LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
}
});
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
// notify()Ist dem PostgreSQL JDBC-Treiber ähnlich
Wie Sie sehen, können Sie hier das Verhalten beim Empfang von Benachrichtigungen in Form eines Ereignis-Listeners implementieren. Es ist auch möglich, einen Listener durch Angabe eines Kanals zu registrieren.
R2DBC
R2DBC ist ein neu entwickelter JDBC-Treiber aus Sicht der reaktiven Programmierung. I / O ist vollständig mit Reactive Streams kompatibel und behauptet, vollständig nicht blockierend zu sein.
pom.xml
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
//Richten Sie im Voraus eine Verbindung zum Senden und Empfangen ein
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;
var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("...")
.port(5432)
.username("...")
.password("...")
.database("...")
.build());
this.receiver = connFactory.create();
this.sender = connFactory.create();
/**
*Empfangen Sie Benachrichtigungen.
*
* @Parameter Kanal Kanal
*/
private void startListen(final String channel) {
this.receiver.map(pgconn -> {
return pgconn.createStatement("LISTEN " + channel)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.thenMany(pgconn.getNotifications())
.doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
.doOnSubscribe(s -> LOG.info("listen start"))
.subscribe();
}).subscribe();
}
/**
*Benachrichtigt den PostgreSQL-Server.
*
* @Parameter Kanal Kanal
* @param payload Nachrichtennutzlast
*/
private void notify(final String channel, final String payload) {
this.sender.map(pgconn -> {
return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then()
.doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
.subscribe();
}).subscribe();
}
Bei Verwendung von R2DBC verwenden Sie die API des abhängigen Project Reactor vollständig. Dieses Mal werde ich nur kurz erklären, aber ich werde eine Reihe von Abläufen erstellen, in denen eine Abfrage ausgeführt, das Ergebnis verarbeitet, zusätzliche Aktionen festgelegt werden, die sich zu einem bestimmten Zeitpunkt bewegen, und schließlich beginnt sich dieser Ablauf zu bewegen Sie rufen subscribe () `auf. Die Aktion, wenn die Benachrichtigung mit "doOnNext ()" eintrifft, wird festgelegt, und die Aktion, wenn sie mit "doOnSubscribe ()" (Zeitpunkt für die Ausführung der Abfrage) abonniert ist, wird festgelegt. Hier wird das Protokoll einfach ausgegeben. Auf den ersten Blick war ich enttäuscht von dem Gefühl, eine asynchrone Stream-Verarbeitung in einem ähnlichen Stil wie die Stream-API von Java zu erstellen, aber diese Seite war eine großartige Lernerfahrung.
NOTIFY / LISTEN von PostgreSQL ist Release 9.0, und das Speicherziel des Wartezustandsereignisses ist der Speicher aus der herkömmlichen Systemtabelle. Die Möglichkeit, Nutzdaten mit Benachrichtigungen anstelle von Warteschlangen zu senden, hat die Leistung und den Komfort verbessert und liegt nun in der aktuellen Form vor. Es scheint, dass die Funktion selbst schon lange installiert ist, aber da sie bisher nicht in Qiita veröffentlicht wurde, habe ich versucht, sie auch als Informationsorganisation zu veröffentlichen, die durch interne technische Überprüfung erhalten wurde.
Übrigens befindet sich R2DBC noch in der aktiven Entwicklung und ich beobachte es seit diesem Jahr, aber seit es im September dieses Jahres die NOTIFY / LISTEN-Funktion unterstützt, habe ich es in diesem Artikel ein wenig behandelt. Ich werde bald einen Artikel mit R2DBC schreiben.
Recommended Posts