[JAVA] Execute blocking processing in reactor in multiple threads

I want to use a reactor to perform blocking processing in multiple threads and summarize the results. The specific mounting method is described in the manual. https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking However, if you actually implement it this way, Schedulers.elastic () will generate a lot of threads, which may consume resources.

    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();
        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //I dare to block for explanation
                              .subscribeOn(Schedulers.elastic()))
            .blockLast();
    }

Execution log

・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();

        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //I dare to block for explanation
                              .subscribeOn(Schedulers.elastic())
                    , 10) //Specify 10 for concurrency
            .blockLast();
    }

Execution log

02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・

The number of threads in the elastic scheduler is limited to 10.

In fact, even when calling flatMap (), concurrency is specified internally. However, because the value is Large It looked like a log with no restrictions.

Also, since Executor can be specified for subscribeOn, it is possible to perform the same processing using FixedThreadPool or WorkStealingPool (ForkJoinPool), although there are minor algorithm differences. Specify .subscribeOn (Schedulers.fromExecutor (executorService))) instead of.subscribeOn (Schedulers.elastic ()). Also, if you want to surely shut down these worker threads when the application is stopped, it seems effective to use ExecutorService.

Recommended Posts

Execute blocking processing in reactor in multiple threads
How to execute multiple commands in docker-compose.yml
Try calling synchronized methods from multiple threads in Java
Various threads in java
Use MouseListener in Processing