I tried to get started with Reactor.
https://github.com/reactor/lite-rx-api-hands-on
Part01Flux
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
Flux.empty ()
can be Flux.just ()
.
The last one can also be done as follows using zipwith
.
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
.map(Tuple2::getT2);
}
Part02Mono
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
I don't know how to use Mono.never ()
.
Part03StepVerifier
public class Part03StepVerifier {
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectError(RuntimeException.class)
.verify();
}
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
void expect10Elements(Flux<Long> flux) {
Duration d = StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
Duration d = StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
Even if Flux
is scheduled to take 100 hours, it seems that the test can be executed in a few seconds by virtually advancing the time by using withVirtualTime
.
https://mike-neck.hatenadiary.com/entry/2018/02/21/080000
Part04Transform
public class Part04Transform {
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(this::capitalizeUser);
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(this::capitalizeUser);
}
private User capitalizeUser(User user) {
return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
}
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
The difference between flatMap
and map
is whether the return value of Function
passed as an argument is Flux
or Mono
, or not.
In the case of faltMap
, it is processed asynchronously, so it may be out of the original order.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
The order of the final output obtained by the above processing is as follows.
hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2
If you use flatMapSequentail
, the original order is preserved.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2
Part05Merge
public class Part05Merge {
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return mono1.concatWith(mono2);
}
}
mergeWith
merges in order of arrival. concatWith
keeps the original order and merges. (The one given as an argument is behind)
In the above test case, flux1
is delayed.
For mergeWith
, the order is flux2-
-> flux1
, and for concatWith
, the order is flux1
-> flux2
.
Part06Request
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
Flux<User> fluxWithDoOnPrintln() {
Logger logger = LoggerFactory.getLogger(Part06Request.class);
return repository.findAll()
.doOnSubscribe(s -> logger.info("Starring:"))
.doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
.doOnComplete(() -> logger.info("The end!"));
}
}
A method called doOn ~~
can define a process for a specific action of Flux
or Mono
.
I think it's like an event listener.
Part07Errors
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
With ʻonErrorReturn, static data can be returned when an error occurs. However, even with
Flux, there is only one piece of data that can be included. With ʻonErrorResume
, you can define fallback processing, so you can generate Flux
with multiple values defined there.
There is also ʻonErrorMap, but this is not available in this case because it defines the process for converting the generated
Throwable to another
Throwable`.
Part08OtherOperations
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.switchIfEmpty(Mono.just(User.SKYLER));
}
}
Part09Adapt
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
Both Reactor and RxJava have methods that receive and generate Publisher
, so you can use that.
However, ʻObservableand
Single are not implementation classes of
Publisher, so they need to be converted to
Flowable. ʻIf you want to convert Observable
to Flowable
, you must specify the Backpressure setting. I don't know if MISSING
is okay.
Part10ReactiveToBlocking
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
You can also block with toStream
.
Part11BlockingToReactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(u -> repository.save(u))
.then();
}
}
By using defer
, you can delay the process until this Flux
is subscribe
.
If you do the following without using defer
,repository.findAll ()
will be executed at the time of this process.
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}
Also, it seems that you can specify the thread strategy to be used when subscribing and publishing with subscribeOn
and publishOn
.
https://kazuhira-r.hatenablog.com/entry/20180107/1515327957
I feel like I understand how to use the API, but I don't really understand how to use it ...
Recommended Posts