Ich möchte den Reaktor verwenden, um die Blockierungsverarbeitung in mehreren Threads durchzuführen und die Ergebnisse zusammenzufassen. Die spezifische Montagemethode ist im Handbuch beschrieben.
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
Wenn Sie es jedoch tatsächlich auf diese Weise implementieren, generiert Schedulers.elastic ()
viele Threads, die möglicherweise Ressourcen verbrauchen.
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //Ich wage es, zur Erklärung zu blockieren
.subscribeOn(Schedulers.elastic()))
.blockLast();
}
Ausführungsprotokoll
・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //Ich wage es, zur Erklärung zu blockieren
.subscribeOn(Schedulers.elastic())
, 10) //Geben Sie 10 für die Parallelität an
.blockLast();
}
Ausführungsprotokoll
02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・
Die Anzahl der Threads im elastischen Scheduler ist auf 10 begrenzt.
Selbst beim Aufrufen von "flatMap ()" wird die Parallelität intern angegeben. Da der Wert jedoch [Large] ist (https://github.com/reactor/reactor-core/blob/628258f644bdddd095e72d1fc81c5d6ac0e3fa12/reactor-core/src/main/java/reactor/util/concurrent/Queues.java#L89) Es sah aus wie ein Protokoll ohne Einschränkungen.
Da Executor für "subscribeOn" angegeben werden kann, ist es auch möglich, dieselbe Verarbeitung mit FixedThreadPool oder WorkStealingPool (ForkJoinPool) durchzuführen, obwohl es geringfügige Unterschiede im Algorithmus gibt. Sie können ".subscribeOn (Schedulers.fromExecutor (executorService))" anstelle von ".subscribeOn (Schedulers.elastic ())" angeben. Wenn Sie diese Arbeitsthreads beim Stoppen der Anwendung sicher herunterfahren möchten, ist die Verwendung von ExecutorService anscheinend effektiv.