[JAVA] Lite Rx API Hands-on Je l'ai essayé

J'ai essayé de démarrer avec Reactor.

https://github.com/reactor/lite-rx-api-hands-on

Part01Flux

public class Part01Flux {
  Flux<String> emptyFlux() {
    return Flux.empty();
  }

  Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
  }

  Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
  }

  Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
  }

  Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
  }
}

Flux.empty () peut être Flux.just (). Le dernier peut également être fait comme suit en utilisant zipwith.

Flux<Long> counter() {
  return Flux.interval(Duration.ofMillis(100))
          .zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
          .map(Tuple2::getT2);
}

Part02Mono

public class Part02Mono {

  Mono<String> emptyMono() {
    return Mono.empty();
  }

  Mono<String> monoWithNoSignal() {
    return Mono.never();
  }

  Mono<String> fooMono() {
    return Mono.just("foo");
  }

  Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
  }
}

Je ne sais pas comment utiliser Mono.never ().

Part03StepVerifier

public class Part03StepVerifier {

  void expectFooBarComplete(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .verifyComplete();
  }

  void expectFooBarError(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .expectError(RuntimeException.class)
            .verify();
  }

  void expectSkylerJesseComplete(Flux<User> flux) {
    StepVerifier.create(flux)
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
            .verifyComplete();
  }

  void expect10Elements(Flux<Long> flux) {
    Duration d = StepVerifier.create(flux)
            .expectNextCount(10)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  void expect3600Elements(Supplier<Flux<Long>> supplier) {
    Duration d = StepVerifier.withVirtualTime(supplier)
            .thenAwait(Duration.ofSeconds(3600))
            .expectNextCount(3600)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  private void fail() {
    throw new AssertionError("workshop not implemented");
  }
}

Même si le programme est défini pour prendre 100 heures, il semble que le test puisse être exécuté en quelques secondes en avançant virtuellement l'heure en utilisant withVirtualTime. https://mike-neck.hatenadiary.com/entry/2018/02/21/080000

Part04Transform

public class Part04Transform {

  Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(this::capitalizeUser);
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(this::capitalizeUser);
  }

  private User capitalizeUser(User user) {
    return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
  }

  Flux<User> asyncCapitalizeMany(Flux<User> flux) {
    return flux.flatMap(this::asyncCapitalizeUser);
  }

  Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
    }
}

La différence entre «flatMap» et «map» est de savoir si la valeur de retour de «Function» passée comme argument est «Flux» ou «Mono», ou non. Dans le cas de faltMap, il est traité de manière asynchrone, il peut donc ne pas être dans l'ordre d'origine.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}

L'ordre de la sortie finale obtenue par le traitement ci-dessus est le suivant.

hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2

Si vous utilisez flatMapSequentail, l'ordre d'origine est conservé.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2

Part05Merge

public class Part05Merge {
  Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.mergeWith(flux2);
  }

  Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.concatWith(flux2);
  }

  Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return mono1.concatWith(mono2);
  }
}

mergeWith fusionne par ordre d'arrivée. concatWith conserve l'ordre d'origine et fusionne. (Ce qui est donné comme argument est derrière) Dans le cas de test ci-dessus, "flux1" est retardé. Pour mergeWith, l'ordre est flux2-> flux1, et pour concatWith, l'ordre est flux1-> flux2.

Part06Request

public class Part06Request {

  ReactiveRepository<User> repository = new ReactiveUserRepository();

  StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux)
            .expectNextCount(4)
            .expectComplete();
  }

  StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
    return StepVerifier.create(flux)
            .thenRequest(1)
            .expectNext(User.SKYLER)
            .thenRequest(1)
            .expectNext(User.JESSE)
            .thenCancel();
  }

  Flux<User> fluxWithLog() {
    return repository.findAll().log();
  }

  Flux<User> fluxWithDoOnPrintln() {
    Logger logger = LoggerFactory.getLogger(Part06Request.class);
    return repository.findAll()
            .doOnSubscribe(s -> logger.info("Starring:"))
            .doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
            .doOnComplete(() -> logger.info("The end!"));
  }
}

Une méthode appelée «doOn ~~» peut définir un processus pour une action spécifique de «Flux» ou «Mono». Je pense que c'est comme un auditeur d'événements.

Part07Errors

public class Part07Errors {

  Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorReturn(User.SAUL);
  }

  Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(u -> {
      try {
        return capitalizeUser(u);
      } catch (GetOutOfHereException e) {
        throw Exceptions.propagate(e);
      }
    });
  }

  User capitalizeUser(User user) throws GetOutOfHereException {
    if (user.equals(User.SAUL)) {
      throw new GetOutOfHereException();
    }
    return new User(user.getUsername(), user.getFirstname(), user.getLastname());
  }

  protected final class GetOutOfHereException extends Exception {
    private static final long serialVersionUID = 0L;
  }
}

Avec ʻonErrorReturn, des données statiques peuvent être renvoyées lorsqu'une erreur se produit. Cependant, même avec "Flux", il n'y a qu'une seule donnée qui peut être incluse. Avec ʻonErrorResume, vous pouvez définir le traitement de secours, ainsi vous pouvez générer Flux avec plusieurs valeurs définies ici. Il y a aussi ʻonErrorMap, mais ce n'est pas disponible dans ce cas car il définit le processus de conversion du Throwable généré en un autre Throwable`.

Part08OtherOperations

public class Part08OtherOperations {

  Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
  }

  Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
    return Mono.first(mono1, mono2);
  }

  Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
    return Flux.first(flux1, flux2);
  }

  Mono<Void> fluxCompletion(Flux<User> flux) {
    return flux.then();
  }

  Mono<User> nullAwareUserToMono(User user) {
    return Mono.justOrEmpty(user);
  }

  Mono<User> emptyToSkyler(Mono<User> mono) {
    return mono.switchIfEmpty(Mono.just(User.SKYLER));
  }
}

Part09Adapt

public class Part09Adapt {

  Flowable<User> fromFluxToFlowable(Flux<User> flux) {
    return Flowable.fromPublisher(flux);
  }

  Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
    return Flux.from(flowable);
  }

  Observable<User> fromFluxToObservable(Flux<User> flux) {
    return Observable.fromPublisher(flux);
  }

  Flux<User> fromObservableToFlux(Observable<User> observable) {
    return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
  }

  Single<User> fromMonoToSingle(Mono<User> mono) {
    return Single.fromPublisher(mono);
  }

  Mono<User> fromSingleToMono(Single<User> single) {
    return Mono.from(single.toFlowable());
  }

  CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
    return mono.toFuture();
  }

  Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
    return Mono.fromFuture(future);
  }
}

Reactor et RxJava ont des méthodes qui reçoivent et génèrent "Publisher", vous pouvez donc l'utiliser. Cependant, ʻObservableetSingle ne sont pas des classes d'implémentation de Publisher, ils doivent donc être convertis en Flowable. ʻSi vous voulez convertir Observable en Flowable, vous devez spécifier le paramètre Backpressure. Je ne sais pas si "MANQUANT" est correct.

Part10ReactiveToBlocking

public class Part10ReactiveToBlocking {
  User monoToValue(Mono<User> mono) {
    return mono.block();
  }

  Iterable<User> fluxToValues(Flux<User> flux) {
    return flux.toIterable();
  }
}

Vous pouvez également bloquer avec toStream.

Part11BlockingToReactive

public class Part11BlockingToReactive {
  Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
  }

  Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux.publishOn(Schedulers.elastic())
            .doOnNext(u -> repository.save(u))
            .then();
  }
}

En utilisant defer, vous pouvez retarder le processus jusqu'à ce que ce Flux soit subscribe. Si vous faites ce qui suit sans utiliser defer,repository.findAll ()sera exécuté au moment de ce processus.

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
  return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}

De plus, il semble que vous puissiez spécifier la stratégie du fil à utiliser lors de l'abonnement et de la publication avec subscribeOn et publishOn. https://kazuhira-r.hatenablog.com/entry/20180107/1515327957

à la fin

J'ai l'impression de comprendre comment utiliser l'API, mais je ne comprends pas vraiment comment l'utiliser ...

Recommended Posts

Lite Rx API Hands-on Je l'ai essayé
J'ai essayé d'utiliser l'API Java8 Stream
J'ai essayé d'utiliser l'API Elasticsearch en Java
J'ai essayé de résumer l'API Stream
J'ai essayé Spring.
[API] J'ai essayé d'utiliser l'API de recherche par code postal
J'ai essayé de mettre Tomcat
J'ai essayé de refactoriser ①
J'ai essayé FizzBuzz.
J'ai essayé JHipster 5.1
Essayé l'API Toot et Streaming de Mastodon en Java
J'ai essayé de dessiner une animation avec l'API Blazor + canvas
[Java] J'ai essayé de mettre en œuvre la recherche de produits de l'API Yahoo
[J'ai essayé] Tutoriel de printemps
J'ai essayé d'exécuter Autoware
J'ai essayé QUARKUS immédiatement
J'ai essayé d'utiliser TestNG
J'ai essayé Spring Batch
J'ai essayé d'utiliser Galasa
J'ai essayé node-jt400 (exécuter)
J'ai essayé node-jt400 (Transactions)
Java SE 13 (JSR388) est sorti alors je l'ai essayé
J'ai essayé de lier le chat avec le serveur de Minecraft avec l'API Discord