[JAVA] Lite Rx API Hands-on Ich habe es versucht

Ich habe versucht, mit Reactor zu beginnen.

https://github.com/reactor/lite-rx-api-hands-on

Part01Flux

public class Part01Flux {
  Flux<String> emptyFlux() {
    return Flux.empty();
  }

  Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
  }

  Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
  }

  Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
  }

  Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
  }
}

Flux.empty () kann Flux.just () sein. Der letzte kann auch wie folgt mit "zipwith" durchgeführt werden.

Flux<Long> counter() {
  return Flux.interval(Duration.ofMillis(100))
          .zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
          .map(Tuple2::getT2);
}

Part02Mono

public class Part02Mono {

  Mono<String> emptyMono() {
    return Mono.empty();
  }

  Mono<String> monoWithNoSignal() {
    return Mono.never();
  }

  Mono<String> fooMono() {
    return Mono.just("foo");
  }

  Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
  }
}

Ich weiß nicht, wie man "Mono.never ()" benutzt.

Part03StepVerifier

public class Part03StepVerifier {

  void expectFooBarComplete(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .verifyComplete();
  }

  void expectFooBarError(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .expectError(RuntimeException.class)
            .verify();
  }

  void expectSkylerJesseComplete(Flux<User> flux) {
    StepVerifier.create(flux)
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
            .verifyComplete();
  }

  void expect10Elements(Flux<Long> flux) {
    Duration d = StepVerifier.create(flux)
            .expectNextCount(10)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  void expect3600Elements(Supplier<Flux<Long>> supplier) {
    Duration d = StepVerifier.withVirtualTime(supplier)
            .thenAwait(Duration.ofSeconds(3600))
            .expectNextCount(3600)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  private void fail() {
    throw new AssertionError("workshop not implemented");
  }
}

Selbst wenn der Zeitplan so ist, dass er 100 Stunden dauert, scheint es, dass der Test in wenigen Sekunden ausgeführt werden kann, indem die Zeit mithilfe von "withVirtualTime" virtuell verlängert wird. https://mike-neck.hatenadiary.com/entry/2018/02/21/080000

Part04Transform

public class Part04Transform {

  Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(this::capitalizeUser);
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(this::capitalizeUser);
  }

  private User capitalizeUser(User user) {
    return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
  }

  Flux<User> asyncCapitalizeMany(Flux<User> flux) {
    return flux.flatMap(this::asyncCapitalizeUser);
  }

  Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
    }
}

Der Unterschied zwischen "flatMap" und "map" besteht darin, ob der als Argument übergebene Rückgabewert von "Function" "Flux" oder "Mono" ist oder nicht. Im Fall von "faltMap" wird es asynchron verarbeitet, sodass es möglicherweise außerhalb der ursprünglichen Reihenfolge liegt.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}

Die Reihenfolge der endgültigen Ausgabe, die durch die obige Verarbeitung erhalten wird, ist wie folgt.

hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2

Wenn Sie "flatMapSequentail" verwenden, bleibt die ursprüngliche Reihenfolge erhalten.

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2

Part05Merge

public class Part05Merge {
  Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.mergeWith(flux2);
  }

  Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.concatWith(flux2);
  }

  Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return mono1.concatWith(mono2);
  }
}

mergeWith wird in der Reihenfolge der Ankunft zusammengeführt. concatWith behält die ursprüngliche Reihenfolge bei und wird zusammengeführt. (Was als Argument angegeben wird, steckt dahinter) Im obigen Testfall ist "Flux1" verzögert. Für "mergeWith" lautet die Reihenfolge "flow2" -> "flow1" und für "concatWith" lautet die Reihenfolge "flow1" -> "flow2".

Part06Request

public class Part06Request {

  ReactiveRepository<User> repository = new ReactiveUserRepository();

  StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux)
            .expectNextCount(4)
            .expectComplete();
  }

  StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
    return StepVerifier.create(flux)
            .thenRequest(1)
            .expectNext(User.SKYLER)
            .thenRequest(1)
            .expectNext(User.JESSE)
            .thenCancel();
  }

  Flux<User> fluxWithLog() {
    return repository.findAll().log();
  }

  Flux<User> fluxWithDoOnPrintln() {
    Logger logger = LoggerFactory.getLogger(Part06Request.class);
    return repository.findAll()
            .doOnSubscribe(s -> logger.info("Starring:"))
            .doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
            .doOnComplete(() -> logger.info("The end!"));
  }
}

Eine Methode namens "doOn ~~" kann einen Prozess für eine bestimmte Aktion von "Flux" oder "Mono" definieren. Ich denke, es ist wie ein Event-Listener.

Part07Errors

public class Part07Errors {

  Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorReturn(User.SAUL);
  }

  Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(u -> {
      try {
        return capitalizeUser(u);
      } catch (GetOutOfHereException e) {
        throw Exceptions.propagate(e);
      }
    });
  }

  User capitalizeUser(User user) throws GetOutOfHereException {
    if (user.equals(User.SAUL)) {
      throw new GetOutOfHereException();
    }
    return new User(user.getUsername(), user.getFirstname(), user.getLastname());
  }

  protected final class GetOutOfHereException extends Exception {
    private static final long serialVersionUID = 0L;
  }
}

Mit onErrorReturn können statische Daten zurückgegeben werden, wenn ein Fehler auftritt. Selbst mit "Flux" kann jedoch nur ein Datenelement eingeschlossen werden. Mit "onErrorResume" können Sie die Fallback-Verarbeitung definieren, sodass Sie "Flux" mit mehreren dort definierten Werten generieren können. Es gibt auch "onErrorMap", dies ist jedoch in diesem Fall nicht verfügbar, da es den Prozess zum Konvertieren des generierten "Throwable" in ein anderes "Throwable" definiert.

Part08OtherOperations

public class Part08OtherOperations {

  Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
  }

  Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
    return Mono.first(mono1, mono2);
  }

  Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
    return Flux.first(flux1, flux2);
  }

  Mono<Void> fluxCompletion(Flux<User> flux) {
    return flux.then();
  }

  Mono<User> nullAwareUserToMono(User user) {
    return Mono.justOrEmpty(user);
  }

  Mono<User> emptyToSkyler(Mono<User> mono) {
    return mono.switchIfEmpty(Mono.just(User.SKYLER));
  }
}

Part09Adapt

public class Part09Adapt {

  Flowable<User> fromFluxToFlowable(Flux<User> flux) {
    return Flowable.fromPublisher(flux);
  }

  Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
    return Flux.from(flowable);
  }

  Observable<User> fromFluxToObservable(Flux<User> flux) {
    return Observable.fromPublisher(flux);
  }

  Flux<User> fromObservableToFlux(Observable<User> observable) {
    return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
  }

  Single<User> fromMonoToSingle(Mono<User> mono) {
    return Single.fromPublisher(mono);
  }

  Mono<User> fromSingleToMono(Single<User> single) {
    return Mono.from(single.toFlowable());
  }

  CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
    return mono.toFuture();
  }

  Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
    return Mono.fromFuture(future);
  }
}

Sowohl Reactor als auch RxJava verfügen über Methoden, die "Publisher" empfangen und generieren, sodass Sie diese verwenden können. "Observable" und "Single" sind jedoch keine Implementierungsklassen von "Publisher", daher müssen sie in "Flowable" konvertiert werden. Wenn Sie "Observable" in "Flowable" konvertieren möchten, müssen Sie die Einstellung "Gegendruck" angeben. Ich weiß nicht, ob "MISSING" in Ordnung ist.

Part10ReactiveToBlocking

public class Part10ReactiveToBlocking {
  User monoToValue(Mono<User> mono) {
    return mono.block();
  }

  Iterable<User> fluxToValues(Flux<User> flux) {
    return flux.toIterable();
  }
}

Sie können auch mit toStream blockieren.

Part11BlockingToReactive

public class Part11BlockingToReactive {
  Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
  }

  Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux.publishOn(Schedulers.elastic())
            .doOnNext(u -> repository.save(u))
            .then();
  }
}

Mit "defer" können Sie den Vorgang verzögern, bis dieser "Flux" "subscribe" ist. Wenn Sie die folgenden Schritte ausführen, ohne "defer" zu verwenden, wird "repository.findAll ()" zum Zeitpunkt dieses Vorgangs ausgeführt.

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
  return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}

Es scheint auch, dass Sie die Strategie des Threads angeben können, der beim Abonnieren und Veröffentlichen mit "subscribeOn" und "PublishOn" verwendet werden soll. https://kazuhira-r.hatenablog.com/entry/20180107/1515327957

schließlich

Ich habe das Gefühl, dass ich verstehe, wie man die API benutzt, aber ich verstehe nicht wirklich, wie man sie benutzt ...

Recommended Posts

Lite Rx API Hands-on Ich habe es versucht
Ich habe versucht, die Java8 Stream API zu verwenden
Ich habe versucht, die Elasticsearch-API in Java zu verwenden
Ich habe versucht, die Stream-API zusammenzufassen
Ich habe es mit Spring versucht.
[API] Ich habe versucht, die Postleitzahlensuch-API zu verwenden
Ich habe versucht, Tomcat zu setzen
Ich habe versucht, ① umzugestalten
Ich habe FizzBuzz ausprobiert.
Ich habe JHipster 5.1 ausprobiert
Versuchte Mastodons Toot- und Streaming-API in Java
Ich habe versucht, Animationen mit der Blazor + Canvas-API zu zeichnen
[Java] Ich habe versucht, die Yahoo API-Produktsuche zu implementieren
[Ich habe es versucht] Spring Tutorial
Ich habe versucht, Autoware auszuführen
Ich habe sofort QUARKUS ausprobiert
Ich habe versucht, TestNG zu verwenden
Ich habe Spring Batch ausprobiert
Ich habe versucht, Galasa zu benutzen
Ich habe versucht, node-jt400 (ausführen)
Ich habe versucht, node-jt400 (Transaktionen)
Java SE 13 (JSR388) wurde veröffentlicht, also habe ich es versucht
Ich habe versucht, den Chat mit dem Minecraft-Server mit der Discord-API zu verknüpfen