J'ai essayé de démarrer avec Reactor.
https://github.com/reactor/lite-rx-api-hands-on
Part01Flux
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
Flux.empty ()
peut être Flux.just ()
.
Le dernier peut également être fait comme suit en utilisant zipwith
.
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
.map(Tuple2::getT2);
}
Part02Mono
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
Je ne sais pas comment utiliser Mono.never ()
.
Part03StepVerifier
public class Part03StepVerifier {
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectError(RuntimeException.class)
.verify();
}
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
void expect10Elements(Flux<Long> flux) {
Duration d = StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
Duration d = StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
Même si le programme est défini pour prendre 100 heures, il semble que le test puisse être exécuté en quelques secondes en avançant virtuellement l'heure en utilisant withVirtualTime
.
https://mike-neck.hatenadiary.com/entry/2018/02/21/080000
Part04Transform
public class Part04Transform {
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(this::capitalizeUser);
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(this::capitalizeUser);
}
private User capitalizeUser(User user) {
return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
}
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
La différence entre «flatMap» et «map» est de savoir si la valeur de retour de «Function» passée comme argument est «Flux» ou «Mono», ou non.
Dans le cas de faltMap
, il est traité de manière asynchrone, il peut donc ne pas être dans l'ordre d'origine.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
L'ordre de la sortie finale obtenue par le traitement ci-dessus est le suivant.
hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2
Si vous utilisez flatMapSequentail
, l'ordre d'origine est conservé.
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2
Part05Merge
public class Part05Merge {
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return mono1.concatWith(mono2);
}
}
mergeWith
fusionne par ordre d'arrivée. concatWith
conserve l'ordre d'origine et fusionne. (Ce qui est donné comme argument est derrière)
Dans le cas de test ci-dessus, "flux1" est retardé.
Pour mergeWith
, l'ordre est flux2
-> flux1
, et pour concatWith
, l'ordre est flux1
-> flux2
.
Part06Request
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
Flux<User> fluxWithDoOnPrintln() {
Logger logger = LoggerFactory.getLogger(Part06Request.class);
return repository.findAll()
.doOnSubscribe(s -> logger.info("Starring:"))
.doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
.doOnComplete(() -> logger.info("The end!"));
}
}
Une méthode appelée «doOn ~~» peut définir un processus pour une action spécifique de «Flux» ou «Mono». Je pense que c'est comme un auditeur d'événements.
Part07Errors
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
Avec ʻonErrorReturn, des données statiques peuvent être renvoyées lorsqu'une erreur se produit. Cependant, même avec "Flux", il n'y a qu'une seule donnée qui peut être incluse. Avec ʻonErrorResume
, vous pouvez définir le traitement de secours, ainsi vous pouvez générer Flux
avec plusieurs valeurs définies ici.
Il y a aussi ʻonErrorMap, mais ce n'est pas disponible dans ce cas car il définit le processus de conversion du
Throwable généré en un autre
Throwable`.
Part08OtherOperations
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.switchIfEmpty(Mono.just(User.SKYLER));
}
}
Part09Adapt
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
Reactor et RxJava ont des méthodes qui reçoivent et génèrent "Publisher", vous pouvez donc l'utiliser.
Cependant, ʻObservableet
Single ne sont pas des classes d'implémentation de
Publisher, ils doivent donc être convertis en
Flowable. ʻSi vous voulez convertir Observable
en Flowable
, vous devez spécifier le paramètre Backpressure. Je ne sais pas si "MANQUANT" est correct.
Part10ReactiveToBlocking
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
Vous pouvez également bloquer avec toStream
.
Part11BlockingToReactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(u -> repository.save(u))
.then();
}
}
En utilisant defer
, vous pouvez retarder le processus jusqu'à ce que ce Flux
soit subscribe
.
Si vous faites ce qui suit sans utiliser defer
,repository.findAll ()
sera exécuté au moment de ce processus.
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}
De plus, il semble que vous puissiez spécifier la stratégie du fil à utiliser lors de l'abonnement et de la publication avec subscribeOn
et publishOn
.
https://kazuhira-r.hatenablog.com/entry/20180107/1515327957
J'ai l'impression de comprendre comment utiliser l'API, mais je ne comprends pas vraiment comment l'utiliser ...
Recommended Posts