[JAVA] Lite Rx API Hands-on I tried it

I tried to get started with Reactor.

https://github.com/reactor/lite-rx-api-hands-on

Part01Flux

public class Part01Flux {
  Flux<String> emptyFlux() {
    return Flux.empty();
  }

  Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
  }

  Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
  }

  Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
  }

  Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
  }
}

Flux.empty () can be Flux.just (). The last one can also be done as follows using zipwith.

Flux<Long> counter() {
  return Flux.interval(Duration.ofMillis(100))
          .zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
          .map(Tuple2::getT2);
}

Part02Mono

public class Part02Mono {

  Mono<String> emptyMono() {
    return Mono.empty();
  }

  Mono<String> monoWithNoSignal() {
    return Mono.never();
  }

  Mono<String> fooMono() {
    return Mono.just("foo");
  }

  Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
  }
}

I don't know how to use Mono.never ().

Part03StepVerifier

public class Part03StepVerifier {

  void expectFooBarComplete(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .verifyComplete();
  }

  void expectFooBarError(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .expectError(RuntimeException.class)
            .verify();
  }

  void expectSkylerJesseComplete(Flux<User> flux) {
    StepVerifier.create(flux)
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
            .verifyComplete();
  }

  void expect10Elements(Flux<Long> flux) {
    Duration d = StepVerifier.create(flux)
            .expectNextCount(10)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  void expect3600Elements(Supplier<Flux<Long>> supplier) {
    Duration d = StepVerifier.withVirtualTime(supplier)
            .thenAwait(Duration.ofSeconds(3600))
            .expectNextCount(3600)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  private void fail() {
    throw new AssertionError("workshop not implemented");
  }
}

Even if Flux is scheduled to take 100 hours, it seems that the test can be executed in a few seconds by virtually advancing the time by using withVirtualTime. https://mike-neck.hatenadiary.com/entry/2018/02/21/080000

Part04Transform

public class Part04Transform {

  Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(this::capitalizeUser);
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(this::capitalizeUser);
  }

  private User capitalizeUser(User user) {
    return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
  }

  Flux<User> asyncCapitalizeMany(Flux<User> flux) {
    return flux.flatMap(this::asyncCapitalizeUser);
  }

  Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
    }
}

The difference between flatMap and map is whether the return value of Function passed as an argument is Flux or Mono, or not. In the case of faltMap, it is processed asynchronously, so it may be out of the original order.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}

The order of the final output obtained by the above processing is as follows.

hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2

If you use flatMapSequentail, the original order is preserved.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2

Part05Merge

public class Part05Merge {
  Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.mergeWith(flux2);
  }

  Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.concatWith(flux2);
  }

  Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return mono1.concatWith(mono2);
  }
}

mergeWith merges in order of arrival. concatWith keeps the original order and merges. (The one given as an argument is behind) In the above test case, flux1 is delayed. For mergeWith, the order is flux2--> flux1, and for concatWith, the order is flux1-> flux2.

Part06Request

public class Part06Request {

  ReactiveRepository<User> repository = new ReactiveUserRepository();

  StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux)
            .expectNextCount(4)
            .expectComplete();
  }

  StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
    return StepVerifier.create(flux)
            .thenRequest(1)
            .expectNext(User.SKYLER)
            .thenRequest(1)
            .expectNext(User.JESSE)
            .thenCancel();
  }

  Flux<User> fluxWithLog() {
    return repository.findAll().log();
  }

  Flux<User> fluxWithDoOnPrintln() {
    Logger logger = LoggerFactory.getLogger(Part06Request.class);
    return repository.findAll()
            .doOnSubscribe(s -> logger.info("Starring:"))
            .doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
            .doOnComplete(() -> logger.info("The end!"));
  }
}

A method called doOn ~~ can define a process for a specific action of Flux or Mono. I think it's like an event listener.

Part07Errors

public class Part07Errors {

  Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorReturn(User.SAUL);
  }

  Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(u -> {
      try {
        return capitalizeUser(u);
      } catch (GetOutOfHereException e) {
        throw Exceptions.propagate(e);
      }
    });
  }

  User capitalizeUser(User user) throws GetOutOfHereException {
    if (user.equals(User.SAUL)) {
      throw new GetOutOfHereException();
    }
    return new User(user.getUsername(), user.getFirstname(), user.getLastname());
  }

  protected final class GetOutOfHereException extends Exception {
    private static final long serialVersionUID = 0L;
  }
}

With ʻonErrorReturn, static data can be returned when an error occurs. However, even with Flux, there is only one piece of data that can be included. With ʻonErrorResume, you can define fallback processing, so you can generate Flux with multiple values defined there. There is also ʻonErrorMap, but this is not available in this case because it defines the process for converting the generated Throwable to another Throwable`.

Part08OtherOperations

public class Part08OtherOperations {

  Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
  }

  Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
    return Mono.first(mono1, mono2);
  }

  Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
    return Flux.first(flux1, flux2);
  }

  Mono<Void> fluxCompletion(Flux<User> flux) {
    return flux.then();
  }

  Mono<User> nullAwareUserToMono(User user) {
    return Mono.justOrEmpty(user);
  }

  Mono<User> emptyToSkyler(Mono<User> mono) {
    return mono.switchIfEmpty(Mono.just(User.SKYLER));
  }
}

Part09Adapt

public class Part09Adapt {

  Flowable<User> fromFluxToFlowable(Flux<User> flux) {
    return Flowable.fromPublisher(flux);
  }

  Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
    return Flux.from(flowable);
  }

  Observable<User> fromFluxToObservable(Flux<User> flux) {
    return Observable.fromPublisher(flux);
  }

  Flux<User> fromObservableToFlux(Observable<User> observable) {
    return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
  }

  Single<User> fromMonoToSingle(Mono<User> mono) {
    return Single.fromPublisher(mono);
  }

  Mono<User> fromSingleToMono(Single<User> single) {
    return Mono.from(single.toFlowable());
  }

  CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
    return mono.toFuture();
  }

  Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
    return Mono.fromFuture(future);
  }
}

Both Reactor and RxJava have methods that receive and generate Publisher, so you can use that. However, ʻObservableandSingle are not implementation classes of Publisher, so they need to be converted to Flowable. ʻIf you want to convert Observable to Flowable, you must specify the Backpressure setting. I don't know if MISSING is okay.

Part10ReactiveToBlocking

public class Part10ReactiveToBlocking {
  User monoToValue(Mono<User> mono) {
    return mono.block();
  }

  Iterable<User> fluxToValues(Flux<User> flux) {
    return flux.toIterable();
  }
}

You can also block with toStream.

Part11BlockingToReactive

public class Part11BlockingToReactive {
  Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
  }

  Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux.publishOn(Schedulers.elastic())
            .doOnNext(u -> repository.save(u))
            .then();
  }
}

By using defer, you can delay the process until this Flux is subscribe. If you do the following without using defer,repository.findAll ()will be executed at the time of this process.

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
  return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}

Also, it seems that you can specify the thread strategy to be used when subscribing and publishing with subscribeOn and publishOn. https://kazuhira-r.hatenablog.com/entry/20180107/1515327957

at the end

I feel like I understand how to use the API, but I don't really understand how to use it ...

Recommended Posts

Lite Rx API Hands-on I tried it
I tried using Java8 Stream API
I tried using Elasticsearch API in Java
I tried to summarize the Stream API
I tried Spring.
[API] I tried using the zip code search API
I tried tomcat
I tried refactoring ①
I tried FizzBuzz.
I tried JHipster 5.1
I tried Mastodon's Toot and Streaming API in Java
I tried using Google Cloud Vision API in Java
I tried to draw animation with Blazor + canvas API
[Java] I tried to implement Yahoo API product search
roman numerals (I tried to simplify it with hash)
[I tried] Spring tutorial
I tried running Autoware
I tried QUARKUS immediately
I tried using TestNG
I tried Spring Batch
I tried using Galasa
I tried node-jt400 (execute)
I tried node-jt400 (Transactions)
I tried to introduce UI animation to Pokedex using Poké API
Java SE 13 (JSR388) has been released so I tried it
I tried to link chat with Minecraft server with Discord API