Fonctionnalité PostgreSQL Pub / Sub et implémentation du client Java

Future Advent Calendar 2019 Ceci est l'article du 18ème jour.


Lors de l'adoption de l'architecture de messagerie de type Pub / Sub, je pense qu'il existe de nombreux cas où des intergiciels de courtage tels que kafka et des services gérés tels qu'Amazon SNS et Google Cloud Pub / Sub sont utilisés, mais en fait, Pub / Sub est également utilisé dans PostgreSQL. Je peux le faire.

Si vous utilisez déjà PostgreSQL pour votre entreprise, vous pouvez facilement réaliser une communication inter-système faiblement couplée sans créer un nouveau courtier Pub / Sub.

Cet article présente cette fonctionnalité et présente les options et considérations lors de l'implémentation du client Pub / Sub en Java.

NOTIFY/LISTEN

Il existe trois requêtes liées à la fonctionnalité Pub / Sub de PostgreSQL:

Regardons l'utilisation et le comportement de base.

canal"foo"Commencez à vous abonner


LISTEN foo;

canal"foo"À"hello"Publier les données


NOTIFY foo, 'hello';
--Ou
SELECT pg_notify('foo', 'hello');

-- "foo"La notification suivante sera envoyée à la session souscrite
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.

La notification sans charge utile est également possible


NOTIFY foo;

-- Asynchronous notification "foo" received from server process with PID 14728.

canal"foo"Quitter S'abonner


UNLISTEN foo;

C'est très simple. Ensuite, nous listerons les principales spécifications de cette fonction et montrerons les points à considérer lors de son utilisation. Pour plus de détails, veuillez consulter le Document officiel.

canal

NOTIFIER Bonjour, 'monde';

-- Asynchronous notification "Bonjour" with payload "monde" received from server process with PID 14728.
```

portée

pgsql_notify_listen_scope.png

Type de données et taille de la charge utile

transaction

Notifications collectées


BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;

-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.

Taille de stockage des messages en suspens

Implémentation du client Pub / Sub en Java

Voici trois types de modèles pour implémenter la communication Pub / Sub décrits jusqu'à présent en Java.

Pilote JDBC PostgreSQL

Ceci est un exemple d'implémentation utilisant le pilote JDBC d'origine PostgreSQL (l'exemple d'implémentation d'origine est ici). Lorsque vous utilisez Maven, ajoutez les dépendances suivantes.

pom.xml


<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.8</version>
</dependency>
//Créez une connexion pour LISTEN à l'avance
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);

/**
 *Commencez à recevoir des notifications.
 *
 * @canal de canal param
 */
private void startListen(final String channel) {
    try {
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
            while (true) {
                var notifications = pgconn.getNotifications(10 * 1000);
                if (this.terminated) {
                    return;
                }
                if (notifications == null) {
                    continue;
                }
                for (var n : notifications) {
                    LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
                }
            }
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

/**
 *Notifie le serveur PostgreSQL.
 *
 * @canal de canal param
 * @param payload Charge utile du message
 */
private void notify(final String channel, final String payload) {
    try {
        var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
        var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
        try (conn; pstmt) {
            pstmt.setString(1, channel);
            pstmt.setString(2, payload);
            pstmt.execute();
        }
        LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
    } catch (SQLException e) {
        LOG.error("exception thrown", e);
    }
}

PGJDBC-NG

pom.xml


<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.3</version>
</dependency>
//Créez une connexion pour LISTEN à l'avance
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);

/**
 *Commencez à recevoir des notifications.
 *
 * @canal de canal param
 */
private void startListen(final String channel) {
    try {
        this.listenerConn.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(final int pid, final String channel, final String payload) {
                LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
            }
        });
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

// notify()Est similaire au pilote JDBC PostgreSQL

Comme vous pouvez le voir, vous pouvez ici implémenter le comportement lors de la réception de notifications sous la forme d'un écouteur d'événements. Il est également possible d'enregistrer un auditeur en spécifiant un canal.

R2DBC

R2DBC est un pilote JDBC nouvellement développé du point de vue de la programmation réactive. Entièrement compatible avec Reactive Streams, les E / S prétendent être totalement non bloquantes.

pom.xml


<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
//Configurer une connexion pour l'envoi et la réception à l'avance
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;

var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
        .host("...")
        .port(5432)
        .username("...")
        .password("...")
        .database("...")
        .build());
this.receiver = connFactory.create();
this.sender = connFactory.create();

/**
 *Commencez à recevoir des notifications.
 *
 * @canal de canal param
 */
private void startListen(final String channel) {
    this.receiver.map(pgconn -> {
        return pgconn.createStatement("LISTEN " + channel)
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .thenMany(pgconn.getNotifications())
                .doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
                .doOnSubscribe(s -> LOG.info("listen start"))
                .subscribe();
    }).subscribe();
}

/**
 *Notifie le serveur PostgreSQL.
 *
 * @canal de canal param
 * @param payload Charge utile du message
 */
private void notify(final String channel, final String payload) {
    this.sender.map(pgconn -> {
        return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .then()
                .doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
                .subscribe();
    }).subscribe();
}

Lorsque vous utilisez R2DBC, vous utiliserez entièrement l'API du [Project Reactor] dépendant (https://projectreactor.io/). Cette fois, je n'expliquerai que brièvement, mais je vais construire une série de flux d'exécution d'une requête, de gestion du résultat, de définition d'actions auxiliaires qui se déplacent à un moment spécifié, et enfin ce flux commencera à bouger Vous appelez subscribe (). L'action lorsque la notification arrive avec doOnNext () est définie, et l'action lors de l'abonnement avec doOnSubscribe () (délai d'exécution de la requête) est définie, et ici le journal est simplement sorti. À première vue, j'ai été déçu par la sensation de créer un traitement de flux asynchrone dans un style similaire à l'API Stream de Java, mais cette page a été une excellente expérience d'apprentissage.

en conclusion

NOTIFY / LISTEN de PostgreSQL est Release 9.0, et la destination de stockage de l'événement d'attente est la mémoire de la table système conventionnelle. La possibilité d'envoyer des charges utiles avec des notifications au lieu de files d'attente a amélioré les performances et la commodité et est maintenant dans sa forme actuelle. Il semble que la fonction elle-même soit installée depuis longtemps, mais comme elle n'a pas été publiée dans Qiita jusqu'à présent, j'ai essayé de la publier également en tant qu'organisation d'information obtenue par vérification technique interne.

Au fait, R2DBC est toujours en développement actif et je le regarde depuis cette année, mais depuis qu'il est venu supporter la fonction NOTIFY / LISTEN en septembre de cette année, je l'ai un peu couvert dans cet article. J'écrirai bientôt un article sur R2DBC.

Recommended Posts

Fonctionnalité PostgreSQL Pub / Sub et implémentation du client Java
Exemple de description et d'implémentation de BloomFilter (JAVA)
[ev3 × Java] Interface, implémentation et héritage (traitement événementiel)
[Mac / Java / Eclipse / PostgreSQL] Connectez l'application Java et la base de données
Comparaison Java et Swift (3) Implémentation de classe / héritage de classe / conception de classe
Client Java mqtt
Java et JavaScript
XXE et Java