[JAVA] Führen Sie die Blockierungsverarbeitung mit dem Reaktor in mehreren Threads aus

Ich möchte den Reaktor verwenden, um die Blockierungsverarbeitung in mehreren Threads durchzuführen und die Ergebnisse zusammenzufassen. Die spezifische Montagemethode ist im Handbuch beschrieben. https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking Wenn Sie es jedoch tatsächlich auf diese Weise implementieren, generiert Schedulers.elastic () viele Threads, die möglicherweise Ressourcen verbrauchen.

    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();
        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //Ich wage es, zur Erklärung zu blockieren
                              .subscribeOn(Schedulers.elastic()))
            .blockLast();
    }

Ausführungsprotokoll

・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();

        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) //Ich wage es, zur Erklärung zu blockieren
                              .subscribeOn(Schedulers.elastic())
                    , 10) //Geben Sie 10 für die Parallelität an
            .blockLast();
    }

Ausführungsprotokoll

02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・

Die Anzahl der Threads im elastischen Scheduler ist auf 10 begrenzt.

Selbst beim Aufrufen von "flatMap ()" wird die Parallelität intern angegeben. Da der Wert jedoch [Large] ist (https://github.com/reactor/reactor-core/blob/628258f644bdddd095e72d1fc81c5d6ac0e3fa12/reactor-core/src/main/java/reactor/util/concurrent/Queues.java#L89) Es sah aus wie ein Protokoll ohne Einschränkungen.

Da Executor für "subscribeOn" angegeben werden kann, ist es auch möglich, dieselbe Verarbeitung mit FixedThreadPool oder WorkStealingPool (ForkJoinPool) durchzuführen, obwohl es geringfügige Unterschiede im Algorithmus gibt. Sie können ".subscribeOn (Schedulers.fromExecutor (executorService))" anstelle von ".subscribeOn (Schedulers.elastic ())" angeben. Wenn Sie diese Arbeitsthreads beim Stoppen der Anwendung sicher herunterfahren möchten, ist die Verwendung von ExecutorService anscheinend effektiv.

Recommended Posts

Führen Sie die Blockierungsverarbeitung mit dem Reaktor in mehreren Threads aus
Versuchen Sie, synchronisierte Methoden aus mehreren Threads in Java aufzurufen
Verschiedene Threads in Java
Verwenden Sie MouseListener für die Verarbeitung