Je souhaite utiliser Reactor pour effectuer un traitement de blocage dans plusieurs threads et résumer les résultats. La méthode de montage spécifique est décrite dans le manuel.
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
Cependant, si vous l'implémentez réellement de cette façon, Schedulers.elastic ()
générera beaucoup de threads, qui peuvent consommer des ressources.
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //J'ose bloquer pour l'explication
.subscribeOn(Schedulers.elastic()))
.blockLast();
}
Journal d'exécution
・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //J'ose bloquer pour l'explication
.subscribeOn(Schedulers.elastic())
, 10) //Spécifiez 10 pour la concurrence
.blockLast();
}
Journal d'exécution
02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・
Le nombre de threads dans le planificateur élastique est limité à 10.
En fait, même en appelant flatMap ()
, la concurrence est spécifiée en interne. Cependant, parce que la valeur est Large Cela ressemblait à un journal sans restrictions.
De plus, comme Executor peut être spécifié pour subscribeOn
, il est possible d'effectuer le même traitement en utilisant FixedThreadPool ou WorkStealingPool (ForkJoinPool), bien qu'il y ait des différences mineures dans l'algorithme. Vous pouvez spécifier .subscribeOn (Schedulers.fromExecutor (executorService)))
au lieu de.subscribeOn (Schedulers.elastic ())
.
En outre, si vous souhaitez arrêter sûrement ces threads de travail lorsque l'application est arrêtée, il semble efficace d'utiliser ExecutorService.