[JAVA] Exécuter le traitement de blocage avec le réacteur dans plusieurs threads

Je souhaite utiliser Reactor pour effectuer un traitement de blocage dans plusieurs threads et résumer les résultats. La méthode de montage spécifique est décrite dans le manuel. https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking Cependant, si vous l'implémentez réellement de cette façon, Schedulers.elastic () générera beaucoup de threads, qui peuvent consommer des ressources.

    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();
        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //J'ose bloquer pour l'explication
                              .subscribeOn(Schedulers.elastic()))
            .blockLast();
    }

Journal d'exécution

・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();

        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //J'ose bloquer pour l'explication
                              .subscribeOn(Schedulers.elastic())
                    , 10) //Spécifiez 10 pour la concurrence
            .blockLast();
    }

Journal d'exécution

02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・

Le nombre de threads dans le planificateur élastique est limité à 10.

En fait, même en appelant flatMap (), la concurrence est spécifiée en interne. Cependant, parce que la valeur est Large Cela ressemblait à un journal sans restrictions.

De plus, comme Executor peut être spécifié pour subscribeOn, il est possible d'effectuer le même traitement en utilisant FixedThreadPool ou WorkStealingPool (ForkJoinPool), bien qu'il y ait des différences mineures dans l'algorithme. Vous pouvez spécifier .subscribeOn (Schedulers.fromExecutor (executorService))) au lieu de.subscribeOn (Schedulers.elastic ()). En outre, si vous souhaitez arrêter sûrement ces threads de travail lorsque l'application est arrêtée, il semble efficace d'utiliser ExecutorService.

Recommended Posts

Exécuter le traitement de blocage avec le réacteur dans plusieurs threads
Essayez d'appeler des méthodes synchronisées à partir de plusieurs threads en Java
Divers threads en java
Utiliser MouseListener avec le traitement