Cet article est un rappel de mes recherches sur Reactive Streams et l'API JDK Flow.
Flow API
(java.util.concurrent.Flow) est une API introduite dans JDK 9 (JEP 266) et est le groupe d'intérêt spécialReactive Streams. Il correspond à la spécification ([Reactive Streams](https://www.reactive-streams.org/)) créée par un groupe de travail appelé
(SIG).
Les bibliothèques JVM qui prennent en charge cette spécification incluent Akka Streams (Lightbend, Inc.), ReactiveX / RxJava, etc. Oui, Project Reactor (Pivotal Software, Inc.) utilisé dans Spring WebFlux est également pris en charge.
environnement
référence
** Qu'est-ce que Reactive Streams **
Ce qui suit est tiré de la phrase d'ouverture de Reactive Streams. (La traduction japonaise est la traduction de Google.)
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Reactive Streams est une initiative qui fournit une norme pour le traitement de flux asynchrone avec contre-pression non bloquante. Cela inclut le travail sur les environnements d'exécution (JVM et JavaScript) et les protocoles réseau.
La phrase «** traitement de flux asynchrone avec contre-pression non bloquante» dans cette phrase décrit clairement les caractéristiques des flux réactifs. L'explication de chaque terme est tirée du glossaire ci-dessous.
** Qu'est-ce qui n'est pas bloquant **
L'API rend la ressource accessible si elle est disponible, sinon retourne immédiatement, indiquant à l'appelant que la ressource n'est pas disponible actuellement ou que l'opération a été lancée et n'est pas encore terminée. L'API non bloquante pour les ressources permet aux appelants d'effectuer un autre travail au lieu de bloquer et d'attendre que les ressources deviennent disponibles.
** Qu'est-ce que la contre-pression? **
Les composants surchargés ne peuvent pas être écrasés de manière catastrophique ou perdre des messages sans contrôle. Si le processus est bloqué et ne peut pas se permettre de planter, le composant doit indiquer aux composants en amont qu'il est surchargé et réduire la charge. Ce mécanisme, appelé contre-pression, est un mécanisme de rétroaction important qui continue de répondre lentement sans perturber le système en cas de surcharge.
** Qu'est-ce que asynchrone? **
Dans le cadre d'une déclaration réactive, cela signifie qu'une demande envoyée d'un client à un service est traitée à tout moment après son envoi. Le client ne peut pas directement observer ou synchroniser l'exécution du traitement de la demande au sein du service de destination.
Reactive Streams Specification for the JVM
Les spécifications de JVM créées par SIG ont été mises à jour vers la version 1.0.3 à partir de novembre 2019.
** Livrables **
Les livrables de Maven incluent les éléments suivants, mais il s'agit de spécifications, de TCK (Technology Compatibility Kit) et d'exemples d'implémentation, ils ne sont donc pas utilisés directement dans les projets normaux, mais des bibliothèques telles que Akka Streams, ReactiveX / RxJava et Reactor sont utilisées. Je pense que ce sera fait.
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck-flow -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-examples -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-examples</artifactId>
<version>1.0.3</version>
</dependency>
API Components
Les quatre interfaces suivantes sont définies dans la spécification de Reactive Streams pour JVM version 1.0.3.
Publisher
Publisher
est un fournisseur d'éléments séquencés illimités ou finis (c'est-à-dire, la publication d'un flux de données) qui publie des éléments lorsqu'il reçoit une demande d'un abonné (via l'abonnement).
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Méthode | La description |
---|---|
subscribe | Une méthode d'usine qui demande à Publisher de commencer à diffuser des données. Peut être appelé plusieurs fois pour chaque nouvel abonnement. |
Subscriber
«Abonné» consomme les éléments auxquels s'est abonné l'éditeur. La méthode onXxx de cette interface est la méthode de rappel qui correspond au signal du serveur de publication.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Méthode | La description |
---|---|
onSubscribe | Publisher#Exécuté après avoir appelé subscribe. L'abonné demande ou annule des données en utilisant l'abonnement reçu comme argument. |
onNext | Subscription#Exécuté après avoir appelé request. |
onError | Exécuté lorsque la transmission des données de l'éditeur échoue. |
onComplete | Exécuté lorsque la transmission des données de l'éditeur est terminée normalement.(Y compris l'annulation) |
Subscription
L'abonnement est une représentation individuelle d'un éditeur et des abonnés qui s'abonnent à cet éditeur. L'Abonné demande à l'éditeur d'envoyer ou d'annuler les données via la méthode d'abonnement.
public interface Subscription {
public void request(long n);
public void cancel();
}
Méthode | La description |
---|---|
request | Demandez à l'éditeur d'envoyer des données. |
cancel | Demandez à Publisher d'arrêter d'envoyer des données et de nettoyer les ressources. |
Processor
Processor
est un composant qui a à la fois des fonctionnalités d'abonné et d'éditeur. Le processeur est situé entre l'éditeur au début et l'abonné à la fin, mais il est possible de concaténer plusieurs processeurs au lieu d'un seul.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Le processeur n'est pas toujours nécessaire, et s'il n'est pas nécessaire, l'éditeur et l'abonné fonctionneront directement comme indiqué dans la figure ci-dessous.
+-----------+ +------------+
| | <-subscribe- | |
| Publisher | | Subscriber |
| | <--request-- | |
+-----------+ +------------+
La figure ci-dessous est une image lorsque deux processeurs (A, B) sont connectés et placés. Une situation où un processeur est nécessaire au milieu est lorsque vous souhaitez effectuer un filtrage ou une conversion de données au milieu d'un flux de données.
+-----------+ +-----------+ +-----------+ +------------+
| | <-subscribe- | | <-subscribe- | | <-subscribe- | |
| Publisher | | Processor | | Processor | | Subscriber |
| | <--request-- | (A) | <--request-- | (B) | <--request-- | |
+-----------+ +-----------+ +-----------+ +------------+
Un exemple d'implémentation peut être trouvé sur GitHub (reactive-streams / reactive-streams-jvm). Vous trouverez ci-dessous un programme de démonstration qui utilise la classe AsyncIterablePublisher, qui est l'un des exemples d'implémentation de Publisher.
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
List<Integer> elements = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(3);
AsyncIterablePublisher<Integer> pub = new AsyncIterablePublisher<>(elements, executor);
MySub mySub1 = new MySub("sub_1");
MySub mySub2 = new MySub("sub_2");
MySub mySub3 = new MySub("sub_3");
log.info("start");
// Publisher#Lorsque vous appelez, abonnez-vous
//La méthode onSubscribe de l'abonné est rappelée
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("end");
try {
//Attendez 30 secondes jusqu'à ce que le processus soit terminé en raison d'un traitement asynchrone
TimeUnit.SECONDS.sleep(30);
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub implements Subscriber<Integer> {
private final String name;
private Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Subscription s) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = s;
//Demander à l'éditeur de publier des données une fois l'abonnement terminé
//En effectuant une demande dans la méthode onSubscribe, l'émission des données commence au moment où l'abonnement est terminé.
s.request(1);
}
@Override
public void onNext(Integer integer) {
//La méthode onNext est rappelée lorsque les données sont publiées à partir de Publisher
log.info("({}) onNext:[{}] item:{}", getId(), name, integer);
//Traiter les données avec cette méthode
//Faire du traitement des données
//Demander à l'éditeur de publier les données suivantes
s.request(1);
//Ou annuler
//s.cancel();
}
@Override
public void onError(Throwable t) {
//Rappelé lorsqu'une erreur se produit lors de la publication des données pour Publisher
log.info("onError:[{}]", name);
}
@Override
public void onComplete() {
//Rappelé lorsque les données de l'éditeur sont publiées (ou annulées)
log.info("({}) onComplete:[{}]", getId(), name);
}
}
}
Résultat d'exécution
[main] INFO Demo - start
[main] INFO Demo - end
[pool-1-thread-2] INFO Demo - (15) onSubscribe:[sub_2]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:2
[pool-1-thread-3] INFO Demo - (16) onSubscribe:[sub_3]
[pool-1-thread-1] INFO Demo - (14) onSubscribe:[sub_1]
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:3
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:4
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:2
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:2
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:5
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:3
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:3
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:4
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:4
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:7
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:5
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:5
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:8
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:6
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:9
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:11
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:9
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:12
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:9
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:10
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:12
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:13
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:19
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:14
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:20
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:15
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_2]
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:16
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:12
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:17
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:20
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:18
[pool-1-thread-1] INFO Demo - (14) onComplete:[sub_1]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:20
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_3]
JDK Flow API
La classe Flow déclare quatre interfaces qui correspondent à la spécification Reactive Streams. Vous devez implémenter ces interfaces lors du développement d'applications prenant en charge les flux réactifs.
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
}
Pour Publisher, il existe une classe d'implémentation appelée SubmissionPublisher \ <T >, qui peut être utilisée telle quelle ou héritée pour implémenter le traitement d'origine.
constructeur |
---|
SubmissionPublisher() |
SubmissionPublisher(Executor executor, int maxBufferCapacity) |
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) |
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>()) {
//réduction
}
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 8)) {
//réduction
}
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 8, (subscriber, throwable) -> {
})) {
//réduction
}
La classe SubmissionPublisher a des méthodes submit
et ʻoffer` pour publier des données.
Méthode de publication des données |
---|
public int submit(T item) |
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
submit
soumettre des blocs jusqu'à ce que les données puissent être soumises.
int lag = pub.submit(value);
if (lag < 0) {
//soumettre ne tombe pas
} else {
//Estimation du délai maximum(Nombre d'articles envoyés mais pas encore consommés)
}
offer
L'offre ne bloque pas la transmission des données et peut exécuter le traitement (qu'il soit renvoyé ou non, etc.) lorsque les données ne peuvent pas être transmises. Dans cet exemple, les données sont supprimées sans être renvoyées.
int lag = offer(item, (subscriber, value) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Ne pas renvoyer
});
if (lag < 0) {
//Nombre de gouttes
} else {
//Estimation du délai maximum(Nombre d'articles envoyés mais pas encore consommés)
}
offer
Vous pouvez également spécifier un délai d'expiration. S'il ne peut pas être envoyé dans cet exemple, il attendra jusqu'à 1 seconde.
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Ne pas renvoyer
});
if (lag < 0) {
//Nombre de gouttes
} else {
//Estimation du délai maximum(Nombre d'articles envoyés mais pas encore consommés)
}
Vous trouverez ci-dessous un programme de démonstration qui utilise la classe SubmissionPublisher.
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
log.info("start");
MySub<Integer> mySub1 = new MySub<>("sub_1");
MySub<Integer> mySub2 = new MySub<>("sub_2");
MySub<Integer> mySub3 = new MySub<>("sub_3");
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 256)) {
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("NumberOfSubscribers:{}", pub.getNumberOfSubscribers());
log.info("MaxBufferCapacity:{}", pub.getMaxBufferCapacity());
IntStream.rangeClosed(1, 100000).forEach(value -> {
log.info("publish:{} estimateMinimumDemand:{} estimateMaximumLag:{}", value, pub.estimateMinimumDemand(), pub.estimateMaximumLag());
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
log.info("publish offer on drop:{}", integer);
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Ne pas renvoyer
});
if (lag < 0) {
//Nombre de gouttes
log.info("drops:{}", lag * -1);
} else {
//Estimation du délai maximum(Nombre d'articles envoyés mais pas encore consommés)
log.info("lag:{}", lag);
}
});
}
log.info("end");
try {
TimeUnit.SECONDS.sleep(10);
mySub1.result();
mySub2.result();
mySub3.result();
if (!executor.isShutdown()) {
log.info("shutdown");
executor.shutdown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub<Integer> implements Flow.Subscriber<Integer> {
private final String name;
private AtomicInteger success = new AtomicInteger(0);
private AtomicInteger error = new AtomicInteger(0);
private Flow.Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = subscription;
s.request(1);
}
@Override
public void onNext(Integer item) {
log.info("({}) onNext:[{}] item:{}", getId(), name, item);
success.incrementAndGet();
s.request(1);
}
@Override
public void onError(Throwable throwable) {
log.info("({}) onError:[{}]", getId(), name);
error.incrementAndGet();
}
@Override
public void onComplete() {
log.info("({}) onComplete:[{}]", getId(), name);
}
public void result() {
log.info("result:[{}] success:{} error:{}", name, success.get(), error.get());
}
}
}
Recommended Posts