[JAVA] CompletableFuture Getting Started 3 (Essayez d'émettre une requête asynchrone)

CompletableFuture Getting Started 2 (Try to make CompletableFuture) A continué

Émettre plusieurs demandes de traitement asynchrone

Il existe une liste de «String» comme ci-dessous, et chacune d'elles est utilisée comme argument pour le traitement asynchrone.

private static final List<String> argsList 
= Arrays.asList("test1", "test2", "test3", "test4");

Le traitement asynchrone est effectué à l'aide de «Stream».

public static List<String> getDoubles() {
    List<CompletableFuture<String>> doubleFutures = argsList.stream()
            //Créer un avenir complet en utilisant une méthode d'usine
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> String.format("value: %f", doSomeLongComputation(arg))
            ))
            .collect(Collectors.toList());

    List<String> strs = doubleFutures.stream()
            //Utilisez join pour obtenir le résultat de Completable Future
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    return strs;
}

Il y a deux points à noter. Décrit dans Completable Future Getting Started 2, En utilisant la méthode d'usine CompletableFuture.supplyAsync, Création d'une liste de CompletableFuture.

De plus, lors de l'obtention du résultat du traitement à partir de la liste CompletableFuture créée, Vous utilisez la méthode CompletableFuture.join.

La différence entre la méthode get et la méthode join est expliquée en détail dans la page suivante. completablefuture join vs get En termes simples, lorsque vous utilisez la méthode join, vous n'avez pas à écrire explicitement une instruction try-catch pour l'exception. (Cependant, ce n'est pas sans exceptions, il faut donc faire attention.)

Description du traitement asynchrone continu

public static List<Double> getDoubleByTimes() {
    List<CompletableFuture<Double>> futures = argsList.stream() 
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> doSomeLongComputation(arg)
            )) // Stream<CompletableFuture>Générer un
            //Passer le résultat du premier CompletableFuture au second CompletableFuture
            .map(future -> future.thenCompose(value ->
                    CompletableFuture.supplyAsync(
                            () -> timeLongComputation(value)))) 
            .collect(toList());
    return futures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

Le flux de traitement est le suivant

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<Double>> // CompletableFuture.thenCompose
↓
List<CompletableFuture<Double>> // toList()

En utilisant thenCompose Deux Futures Completables différents peuvent être utilisés en cascade.

thenCompose(Function<? super T,? extends CompletionStage> fn)

Une fois cette étape terminée avec succès Renvoie un nouveau CompletionStage qui sera exécuté avec cette étape définie comme argument de la fonction spécifiée. thenCompose@Oracle

Appelez le "then Compose" du premier "Completable Future" et Passez le résultat à la fonction. Cette fonction prend la valeur retournée par le premier CompletableFuture comme argument La valeur calculée dans le deuxième CompletableFuture à l'aide de cet argument est renvoyée comme valeur de retour.

notes (différence entre puis Apply et ensuite Compose)

Une fonction similaire est «thenApply».

thenApply(Function<? super T,? extends U> fn) Renvoie un nouveau CompletionStage qui sera exécuté lorsque cette étape se termine avec succès, avec le résultat de cette étape défini comme argument de la fonction spécifiée. thenApply@Oracle

CompletableFuture | thenApply vs thenCompose

L'explication de StackOverFlow ci-dessus est détaillée. En particulier, les réponses suivantes ont été très utiles.

thenApply() returned the nested futures as they were, but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.

Si vous utilisez thenApply dans le code ci-dessus, la valeur de retour sera, Cela ressemble à CompletableFuture <CompletableFuture <Double >> Completable Future sera désormais imbriqué.

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<CompletableFuture<Double>>> // CompletableFuture.thenApply
↓
List<CompletableFuture<CompletableFuture<Double>>> // toList()

La documentation Java 9 cite les exemples suivants comme concepts similaires: CompletableFuture.thenApply ⇄ Stream.map CompletableFuture.thenCompose ⇄ Stream.flatMap

Pour map et flatMap, Pour comprendre la carte et la flatmap dans Stream (1) Alors, je l'ai résumé un peu.

Lors de l'utilisation des résultats de deux contrats à terme complets indépendants

thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

Renvoie un nouveau CompletionStage qui sera exécuté lorsque cette étape et les autres étapes spécifiées se terminent avec succès (Au moment de l'exécution, deux résultats sont utilisés comme arguments pour la fonction spécifiée).

List<CompletableFuture<Double>> futures = argsList.stream()
        .map(arg -> CompletableFuture.supplyAsync(
                () -> doSomeLongComputation(arg)
        ))
        .map(future -> future.thenCombine(
                CompletableFuture.supplyAsync(
                        () -> doSomeLongComputation("test")),
                (resultFromFirstFuture, resultFromSecondsFuture) -> resultFromFirstFuture * resultFromSecondsFuture
        ))
        .collect(toList());

return futures.stream()
        .map(CompletableFuture::join)
        .collect(toList());

notes: thenCombine / thenCombineAsync S'il existe un suffixe Async, Le deuxième argument, BiFunction, est passé au pool de threads et Il sera exécuté de manière asynchrone comme une tâche différente.

reference Java8 In Action

Recommended Posts

CompletableFuture Getting Started 3 (Essayez d'émettre une requête asynchrone)
CompletableFuture Getting Started 2 (Essayez de faire CompletableFuture)