Future Advent Calendar 2019 Ceci est l'article du 18ème jour.
Lors de l'adoption de l'architecture de messagerie de type Pub / Sub, je pense qu'il existe de nombreux cas où des intergiciels de courtage tels que kafka et des services gérés tels qu'Amazon SNS et Google Cloud Pub / Sub sont utilisés, mais en fait, Pub / Sub est également utilisé dans PostgreSQL. Je peux le faire.
Si vous utilisez déjà PostgreSQL pour votre entreprise, vous pouvez facilement réaliser une communication inter-système faiblement couplée sans créer un nouveau courtier Pub / Sub.
Cet article présente cette fonctionnalité et présente les options et considérations lors de l'implémentation du client Pub / Sub en Java.
NOTIFY/LISTEN
Il existe trois requêtes liées à la fonctionnalité Pub / Sub de PostgreSQL:
NOTIFY channel [, payload]
Pg_notify
est également fourni en tant que fonction avec la même fonction.LISTEN channel
Regardons l'utilisation et le comportement de base.
canal"foo"Commencez à vous abonner
LISTEN foo;
canal"foo"À"hello"Publier les données
NOTIFY foo, 'hello';
--Ou
SELECT pg_notify('foo', 'hello');
-- "foo"La notification suivante sera envoyée à la session souscrite
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.
La notification sans charge utile est également possible
NOTIFY foo;
-- Asynchronous notification "foo" received from server process with PID 14728.
canal"foo"Quitter S'abonner
UNLISTEN foo;
C'est très simple. Ensuite, nous listerons les principales spécifications de cette fonction et montrerons les points à considérer lors de son utilisation. Pour plus de détails, veuillez consulter le Document officiel.
Channel est une chaîne de caractères arbitraire qui est une clé pour la communication Pub / Sub. Les données ne peuvent pas être échangées si le canal ciblé pour LISTEN et le canal qui exécute NOTIFY sont différents.
Vous pouvez écouter plusieurs canaux en une seule session.
Les caractères pouvant être spécifiés pour les canaux peuvent être des caractères alphanumériques et des traits de soulignement (_) en ASCII. Il est insensible à la casse. Nous avons confirmé que des caractères multi-octets peuvent également être utilisés.
NOTIFIER Bonjour, 'monde';
-- Asynchronous notification "Bonjour" with payload "monde" received from server process with PID 14728.
```
Les données sur la charge utile sont uniquement du texte et les binaires ne peuvent pas être envoyés ou reçus.
Si vous voulez mettre des données binaires, vous devez les convertir au format texte avec la fonction ʻencode` ou les sérialiser en chaîne de caractères JSON etc. avec l'application appelante.
La taille maximale de la charge utile est inférieure à 8 000 octets, et si elle est dépassée, l'erreur suivante sera renvoyée.
ERROR: payload string too long
SQL state: 22023
Notifications collectées
BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;
-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.
pg_notification_queue_usage
(exprimée en fraction de 0 à 1).Voici trois types de modèles pour implémenter la communication Pub / Sub décrits jusqu'à présent en Java.
Ceci est un exemple d'implémentation utilisant le pilote JDBC d'origine PostgreSQL (l'exemple d'implémentation d'origine est ici). Lorsque vous utilisez Maven, ajoutez les dépendances suivantes.
pom.xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.8</version>
</dependency>
//Créez une connexion pour LISTEN à l'avance
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
/**
*Commencez à recevoir des notifications.
*
* @canal de canal param
*/
private void startListen(final String channel) {
try {
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
while (true) {
var notifications = pgconn.getNotifications(10 * 1000);
if (this.terminated) {
return;
}
if (notifications == null) {
continue;
}
for (var n : notifications) {
LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
}
}
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
/**
*Notifie le serveur PostgreSQL.
*
* @canal de canal param
* @param payload Charge utile du message
*/
private void notify(final String channel, final String payload) {
try {
var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
try (conn; pstmt) {
pstmt.setString(1, channel);
pstmt.setString(2, payload);
pstmt.execute();
}
LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
} catch (SQLException e) {
LOG.error("exception thrown", e);
}
}
PgConnection # getNotifications (int timeoutMillis)
, il se bloquera ici pendant un temps spécifié jusqu'à ce que les notifications arrivent, donc si vous l'enclenchez dans une boucle, ce sera une longue logique d'interrogation.NOTIFY
, j'obtiens ʻorg.postgresql.util.PSQLException, et je ne trouve pas de solution de contournement, alors j'essaye d'exécuter
pg_notify`.PGJDBC-NG
pom.xml
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.3</version>
</dependency>
//Créez une connexion pour LISTEN à l'avance
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);
/**
*Commencez à recevoir des notifications.
*
* @canal de canal param
*/
private void startListen(final String channel) {
try {
this.listenerConn.addNotificationListener(new PGNotificationListener() {
@Override
public void notification(final int pid, final String channel, final String payload) {
LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
}
});
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
// notify()Est similaire au pilote JDBC PostgreSQL
Comme vous pouvez le voir, vous pouvez ici implémenter le comportement lors de la réception de notifications sous la forme d'un écouteur d'événements. Il est également possible d'enregistrer un auditeur en spécifiant un canal.
R2DBC
R2DBC est un pilote JDBC nouvellement développé du point de vue de la programmation réactive. Entièrement compatible avec Reactive Streams, les E / S prétendent être totalement non bloquantes.
pom.xml
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
//Configurer une connexion pour l'envoi et la réception à l'avance
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;
var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("...")
.port(5432)
.username("...")
.password("...")
.database("...")
.build());
this.receiver = connFactory.create();
this.sender = connFactory.create();
/**
*Commencez à recevoir des notifications.
*
* @canal de canal param
*/
private void startListen(final String channel) {
this.receiver.map(pgconn -> {
return pgconn.createStatement("LISTEN " + channel)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.thenMany(pgconn.getNotifications())
.doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
.doOnSubscribe(s -> LOG.info("listen start"))
.subscribe();
}).subscribe();
}
/**
*Notifie le serveur PostgreSQL.
*
* @canal de canal param
* @param payload Charge utile du message
*/
private void notify(final String channel, final String payload) {
this.sender.map(pgconn -> {
return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then()
.doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
.subscribe();
}).subscribe();
}
Lorsque vous utilisez R2DBC, vous utiliserez entièrement l'API du [Project Reactor] dépendant (https://projectreactor.io/).
Cette fois, je n'expliquerai que brièvement, mais je vais construire une série de flux d'exécution d'une requête, de gestion du résultat, de définition d'actions auxiliaires qui se déplacent à un moment spécifié, et enfin ce flux commencera à bouger Vous appelez subscribe ()
.
L'action lorsque la notification arrive avec doOnNext ()
est définie, et l'action lors de l'abonnement avec doOnSubscribe ()
(délai d'exécution de la requête) est définie, et ici le journal est simplement sorti.
À première vue, j'ai été déçu par la sensation de créer un traitement de flux asynchrone dans un style similaire à l'API Stream de Java, mais cette page a été une excellente expérience d'apprentissage.
NOTIFY / LISTEN de PostgreSQL est Release 9.0, et la destination de stockage de l'événement d'attente est la mémoire de la table système conventionnelle. La possibilité d'envoyer des charges utiles avec des notifications au lieu de files d'attente a amélioré les performances et la commodité et est maintenant dans sa forme actuelle. Il semble que la fonction elle-même soit installée depuis longtemps, mais comme elle n'a pas été publiée dans Qiita jusqu'à présent, j'ai essayé de la publier également en tant qu'organisation d'information obtenue par vérification technique interne.
Au fait, R2DBC est toujours en développement actif et je le regarde depuis cette année, mais depuis qu'il est venu supporter la fonction NOTIFY / LISTEN en septembre de cette année, je l'ai un peu couvert dans cet article. J'écrirai bientôt un article sur R2DBC.
Recommended Posts