Ich habe versucht, mit Reactor zu beginnen.
https://github.com/reactor/lite-rx-api-hands-on
Part01Flux
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
Flux.empty ()
kann Flux.just ()
sein.
Der letzte kann auch wie folgt mit "zipwith" durchgeführt werden.
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
.map(Tuple2::getT2);
}
Part02Mono
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
Ich weiß nicht, wie man "Mono.never ()" benutzt.
Part03StepVerifier
public class Part03StepVerifier {
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectError(RuntimeException.class)
.verify();
}
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
void expect10Elements(Flux<Long> flux) {
Duration d = StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
Duration d = StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
Selbst wenn der Zeitplan so ist, dass er 100 Stunden dauert, scheint es, dass der Test in wenigen Sekunden ausgeführt werden kann, indem die Zeit mithilfe von "withVirtualTime" virtuell verlängert wird. https://mike-neck.hatenadiary.com/entry/2018/02/21/080000
Part04Transform
public class Part04Transform {
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(this::capitalizeUser);
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(this::capitalizeUser);
}
private User capitalizeUser(User user) {
return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
}
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
Der Unterschied zwischen "flatMap" und "map" besteht darin, ob der als Argument übergebene Rückgabewert von "Function" "Flux" oder "Mono" ist oder nicht. Im Fall von "faltMap" wird es asynchron verarbeitet, sodass es möglicherweise außerhalb der ursprünglichen Reihenfolge liegt.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
Die Reihenfolge der endgültigen Ausgabe, die durch die obige Verarbeitung erhalten wird, ist wie folgt.
hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2
Wenn Sie "flatMapSequentail" verwenden, bleibt die ursprüngliche Reihenfolge erhalten.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2
Part05Merge
public class Part05Merge {
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return mono1.concatWith(mono2);
}
}
mergeWith
wird in der Reihenfolge der Ankunft zusammengeführt. concatWith
behält die ursprüngliche Reihenfolge bei und wird zusammengeführt. (Was als Argument angegeben wird, steckt dahinter)
Im obigen Testfall ist "Flux1" verzögert.
Für "mergeWith" lautet die Reihenfolge "flow2" -> "flow1" und für "concatWith" lautet die Reihenfolge "flow1" -> "flow2".
Part06Request
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
Flux<User> fluxWithDoOnPrintln() {
Logger logger = LoggerFactory.getLogger(Part06Request.class);
return repository.findAll()
.doOnSubscribe(s -> logger.info("Starring:"))
.doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
.doOnComplete(() -> logger.info("The end!"));
}
}
Eine Methode namens "doOn ~~" kann einen Prozess für eine bestimmte Aktion von "Flux" oder "Mono" definieren. Ich denke, es ist wie ein Event-Listener.
Part07Errors
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
Mit onErrorReturn
können statische Daten zurückgegeben werden, wenn ein Fehler auftritt.
Selbst mit "Flux" kann jedoch nur ein Datenelement eingeschlossen werden.
Mit "onErrorResume" können Sie die Fallback-Verarbeitung definieren, sodass Sie "Flux" mit mehreren dort definierten Werten generieren können.
Es gibt auch "onErrorMap", dies ist jedoch in diesem Fall nicht verfügbar, da es den Prozess zum Konvertieren des generierten "Throwable" in ein anderes "Throwable" definiert.
Part08OtherOperations
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.switchIfEmpty(Mono.just(User.SKYLER));
}
}
Part09Adapt
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
Sowohl Reactor als auch RxJava verfügen über Methoden, die "Publisher" empfangen und generieren, sodass Sie diese verwenden können. "Observable" und "Single" sind jedoch keine Implementierungsklassen von "Publisher", daher müssen sie in "Flowable" konvertiert werden. Wenn Sie "Observable" in "Flowable" konvertieren möchten, müssen Sie die Einstellung "Gegendruck" angeben. Ich weiß nicht, ob "MISSING" in Ordnung ist.
Part10ReactiveToBlocking
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
Sie können auch mit toStream
blockieren.
Part11BlockingToReactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(u -> repository.save(u))
.then();
}
}
Mit "defer" können Sie den Vorgang verzögern, bis dieser "Flux" "subscribe" ist. Wenn Sie die folgenden Schritte ausführen, ohne "defer" zu verwenden, wird "repository.findAll ()" zum Zeitpunkt dieses Vorgangs ausgeführt.
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}
Es scheint auch, dass Sie die Strategie des Threads angeben können, der beim Abonnieren und Veröffentlichen mit "subscribeOn" und "PublishOn" verwendet werden soll. https://kazuhira-r.hatenablog.com/entry/20180107/1515327957
Ich habe das Gefühl, dass ich verstehe, wie man die API benutzt, aber ich verstehe nicht wirklich, wie man sie benutzt ...
Recommended Posts