This article is a note when exploring Reactive Streams and the JDK Flow API.
Flow API
(java.util.concurrent.Flow) is an API introduced in JDK 9 (JEP 266) and is theReactive Streams Special Interest Group. It corresponds to the specification ([Reactive Streams](https://www.reactive-streams.org/)) created by a working group called
(SIG).
JVM libraries that support this specification include Akka Streams (Lightbend, Inc.), ReactiveX / RxJava, etc. Yes, Project Reactor (Pivotal Software, Inc.) used in Spring WebFlux is also supported.
environment
reference
** What is Reactive Streams **
The following is taken from the opening sentence of Reactive Streams. (Japanese translation is Google Translate.)
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Reactive Streams is an initiative that provides a standard for asynchronous stream processing with non-blocking back pressure. This includes work on runtime environments (JVM and JavaScript) and network protocols.
The sentence "** asynchronous stream processing with non-blocking back pressure" in this sentence clearly describes the characteristics of Reactive Streams. The explanation of each term is quoted from the glossary below.
** What is non blocking? **
The API will access the resource if it is available, otherwise it will return immediately to tell the caller that the resource is not currently available or that the operation has started and has not yet completed. The non-blocking API for resources allows callers to do other work instead of blocking and waiting for resources to become available.
** What is back pressure? **
It is not permissible for overloaded components to crash catastrophically or lose messages without control. If the process is stuck and can't afford to crash, the component should tell the upstream components that it is overloaded to reduce the load. This mechanism, called back-pressure, is an important feedback mechanism that keeps responding slowly without disrupting the system under overload.
** What is asynchronous? **
In the context of a reactive declaration, it means "a request sent from a client to a service is processed at any time after it is sent". The client cannot directly observe or synchronize the execution of request processing within the destination service.
Reactive Streams Specification for the JVM
The specification for JVM created by SIG has been updated to version 1.0.3 as of November 2019.
** Deliverables **
Maven's deliverables include the following, but since these are specifications, TCK (Technology Compatibility Kit), and implementation examples, they are not used directly in normal projects, but libraries such as Akka Streams, ReactiveX / RxJava, and Reactor are used. I think it will be done.
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck-flow -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-examples -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-examples</artifactId>
<version>1.0.3</version>
</dependency>
API Components
The following four interfaces are defined in the Reactive Streams specification for JVM version 1.0.3.
Publisher
Publisher
is a provider of unlimited or finite sequenced elements (that is, publishing a data stream) that publishes elements when it receives a request from a Subscriber (through Subscription).
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Method | Description |
---|---|
subscribe | A factory method that asks Publisher to start streaming data. Can be called multiple times for each new Subscription. |
Subscriber
Subscriber
consumes the elements subscribed to by Publisher. The onXxx method of this interface is a callback method that corresponds to the signal from Publisher.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Method | Description |
---|---|
onSubscribe | Publisher#Executed after calling subscribe. Subscriber requests or cancels data using the Subscription received as an argument. |
onNext | Subscription#Executed after calling request. |
onError | Executed when Publisher data transmission fails. |
onComplete | Executed when Publisher data transmission is completed normally.(Including cancellation) |
Subscription
Subscription
is a one-to-one representation of a Publisher and the Subscribers that subscribe to that Publisher. The Subscriber requests the Publisher to send or cancel the data via the Subscription method.
public interface Subscription {
public void request(long n);
public void cancel();
}
Method | Description |
---|---|
request | Request Publisher to send data. |
cancel | Request Publisher to stop sending data and clean up resources. |
Processor
Processor
is a component that has both Subscriber and Publisher functionality. The Processor is located between the Publisher at the beginning and the Subscriber at the end, but it is possible to concatenate and place multiple Processors instead of just one.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
The Processor is not always necessary, and if it is not necessary, the Publisher and Subscriber will work directly as shown in the figure below.
+-----------+ +------------+
| | <-subscribe- | |
| Publisher | | Subscriber |
| | <--request-- | |
+-----------+ +------------+
The figure below is an image when two Processors (A, B) are connected and placed. A situation where a Processor is needed in the middle like this is when you want to perform filtering or data conversion in the middle of a data stream.
+-----------+ +-----------+ +-----------+ +------------+
| | <-subscribe- | | <-subscribe- | | <-subscribe- | |
| Publisher | | Processor | | Processor | | Subscriber |
| | <--request-- | (A) | <--request-- | (B) | <--request-- | |
+-----------+ +-----------+ +-----------+ +------------+
An example implementation can be found on GitHub (reactive-streams / reactive-streams-jvm). Below is a demo program that uses the AsyncIterablePublisher class, which is one of the Publisher implementation examples.
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
List<Integer> elements = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(3);
AsyncIterablePublisher<Integer> pub = new AsyncIterablePublisher<>(elements, executor);
MySub mySub1 = new MySub("sub_1");
MySub mySub2 = new MySub("sub_2");
MySub mySub3 = new MySub("sub_3");
log.info("start");
// Publisher#When you call subscribe
//Subscriber's onSubscribe method is called back
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("end");
try {
//Wait 30 seconds until the process is completed due to asynchronous processing
TimeUnit.SECONDS.sleep(30);
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub implements Subscriber<Integer> {
private final String name;
private Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Subscription s) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = s;
//Request Publisher to publish data when subscription is complete
//By requesting in the onSubscribe method, data issuance starts at the same time as the subscription is completed.
s.request(1);
}
@Override
public void onNext(Integer integer) {
//OnNext method is called back when data is published from Publisher
log.info("({}) onNext:[{}] item:{}", getId(), name, integer);
//Perform data processing within this method
//Do some data processing
//Request Publisher to publish the following data
s.request(1);
//Or cancel
//s.cancel();
}
@Override
public void onError(Throwable t) {
//Called back when an error occurs in publishing data of Publisher
log.info("onError:[{}]", name);
}
@Override
public void onComplete() {
//Called back when Publisher data issuance completed (or canceled)
log.info("({}) onComplete:[{}]", getId(), name);
}
}
}
Execution result
[main] INFO Demo - start
[main] INFO Demo - end
[pool-1-thread-2] INFO Demo - (15) onSubscribe:[sub_2]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:2
[pool-1-thread-3] INFO Demo - (16) onSubscribe:[sub_3]
[pool-1-thread-1] INFO Demo - (14) onSubscribe:[sub_1]
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:3
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:4
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:2
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:2
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:5
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:3
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:3
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:4
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:4
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:7
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:5
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:5
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:8
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:6
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:9
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:11
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:9
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:12
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:9
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:10
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:12
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:13
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:19
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:14
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:20
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:15
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_2]
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:16
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:12
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:17
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:20
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:18
[pool-1-thread-1] INFO Demo - (14) onComplete:[sub_1]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:20
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_3]
JDK Flow API
The Flow class declares four interfaces that correspond to the Reactive Streams specification. You need to implement these interfaces when developing applications that support reactive streams.
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
}
For Publisher, there is an implementation class called SubmissionPublisher \ <T >, which can be used as it is or inherited to implement original processing.
constructor |
---|
SubmissionPublisher() |
SubmissionPublisher(Executor executor, int maxBufferCapacity) |
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) |
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>()) {
//abridgement
}
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 8)) {
//abridgement
}
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 8, (subscriber, throwable) -> {
})) {
//abridgement
}
The SubmissionPublisher class has submit
and ʻoffer` methods for publishing data.
Data publishing method |
---|
public int submit(T item) |
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
submit
submit blocks until the data can be submitted.
int lag = pub.submit(value);
if (lag < 0) {
//submit does not drop
} else {
//Maximum delay estimate(Number of items sent but not yet consumed)
}
offer
The offer does not block the data transmission, and can execute the processing (whether to resend or not, etc.) when the data cannot be transmitted. In this example, the data is dropped without resending.
int lag = offer(item, (subscriber, value) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Do not resend
});
if (lag < 0) {
//Number of drops
} else {
//Maximum delay estimate(Number of items sent but not yet consumed)
}
offer
You can also specify a timeout period. If it cannot be sent in this example, it will wait up to 1 second.
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Do not resend
});
if (lag < 0) {
//Number of drops
} else {
//Maximum delay estimate(Number of items sent but not yet consumed)
}
Below is a demo program that uses the SubmissionPublisher class.
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
log.info("start");
MySub<Integer> mySub1 = new MySub<>("sub_1");
MySub<Integer> mySub2 = new MySub<>("sub_2");
MySub<Integer> mySub3 = new MySub<>("sub_3");
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 256)) {
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("NumberOfSubscribers:{}", pub.getNumberOfSubscribers());
log.info("MaxBufferCapacity:{}", pub.getMaxBufferCapacity());
IntStream.rangeClosed(1, 100000).forEach(value -> {
log.info("publish:{} estimateMinimumDemand:{} estimateMaximumLag:{}", value, pub.estimateMinimumDemand(), pub.estimateMaximumLag());
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
log.info("publish offer on drop:{}", integer);
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; //Do not resend
});
if (lag < 0) {
//Number of drops
log.info("drops:{}", lag * -1);
} else {
//Maximum delay estimate(Number of items sent but not yet consumed)
log.info("lag:{}", lag);
}
});
}
log.info("end");
try {
TimeUnit.SECONDS.sleep(10);
mySub1.result();
mySub2.result();
mySub3.result();
if (!executor.isShutdown()) {
log.info("shutdown");
executor.shutdown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub<Integer> implements Flow.Subscriber<Integer> {
private final String name;
private AtomicInteger success = new AtomicInteger(0);
private AtomicInteger error = new AtomicInteger(0);
private Flow.Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = subscription;
s.request(1);
}
@Override
public void onNext(Integer item) {
log.info("({}) onNext:[{}] item:{}", getId(), name, item);
success.incrementAndGet();
s.request(1);
}
@Override
public void onError(Throwable throwable) {
log.info("({}) onError:[{}]", getId(), name);
error.incrementAndGet();
}
@Override
public void onComplete() {
log.info("({}) onComplete:[{}]", getId(), name);
}
public void result() {
log.info("result:[{}] success:{} error:{}", name, success.get(), error.get());
}
}
}
Recommended Posts