CompletableFuture Getting Started 2 (Try to make CompletableFuture) Continued
There is a list of String
as below, and each of these is used as an argument for asynchronous processing.
private static final List<String> argsList
= Arrays.asList("test1", "test2", "test3", "test4");
Asynchronous processing is performed using Stream
.
public static List<String> getDoubles() {
List<CompletableFuture<String>> doubleFutures = argsList.stream()
//Create a Completable Future using a factory method
.map(arg -> CompletableFuture.supplyAsync(
() -> String.format("value: %f", doSomeLongComputation(arg))
))
.collect(Collectors.toList());
List<String> strs = doubleFutures.stream()
//Use join to get CompletableFuture results
.map(CompletableFuture::join)
.collect(Collectors.toList());
return strs;
}
There are two points to note.
Described in Completable Future Getting Started 2,
Using the CompletableFuture.supplyAsync
factory method,
Creating a list of CompletableFuture
.
In addition, when getting the processing result from the created CompletableFuture
list,
You are using the CompletableFuture.join
method.
The difference between the get method and the join method is explained in detail on the following page. completablefuture join vs get Simply put, when using the join method, you don't have to explicitly write a try-catch statement for the exception. (However, it is not without exceptions, so care is required.)
public static List<Double> getDoubleByTimes() {
List<CompletableFuture<Double>> futures = argsList.stream()
.map(arg -> CompletableFuture.supplyAsync(
() -> doSomeLongComputation(arg)
)) // Stream<CompletableFuture>Generate a
//Pass the result of the first CompletableFuture to the second CompletableFuture
.map(future -> future.thenCompose(value ->
CompletableFuture.supplyAsync(
() -> timeLongComputation(value))))
.collect(toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
The processing flow is as follows
List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<Double>> // CompletableFuture.thenCompose
↓
List<CompletableFuture<Double>> // toList()
By using thenCompose
Two different Completable Futures can be used in cascade.
thenCompose(Function<? super T,? extends CompletionStage> fn)
When this stage completes successfully Returns a new CompletionStage that will be executed with this stage set as an argument to the specified function. thenCompose@Oracle
Call the thenCompose
of the first CompletableFuture
and
Pass the result to the function.
This function takes the value returned by the first CompletableFuture
as an argument
The value calculated in the second CompletableFuture
using that argument is returned as the return value.
A similar function is thenApply
.
thenApply(Function<? super T,? extends U> fn) Returns a new CompletionStage that will be executed when this stage completes successfully, with the result of this stage set as an argument to the specified function. thenApply@Oracle
CompletableFuture | thenApply vs thenCompose
The explanation of StackOverFlow above is detailed. In particular, the following answers were very helpful.
thenApply() returned the nested futures as they were, but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.
If you use thenApply
in the above code, the return value will be,
It looks like CompletableFuture <CompletableFuture <Double >>
Completable Future will now be nested.
List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<CompletableFuture<Double>>> // CompletableFuture.thenApply
↓
List<CompletableFuture<CompletableFuture<Double>>> // toList()
The Java 9 documentation cites the following examples as similar concepts: CompletableFuture.thenApply ⇄ Stream.map CompletableFuture.thenCompose ⇄ Stream.flatMap
For map and flatMap, Toward understanding map and flatmap in Stream (1) So, I summarized it a little.
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
Returns a new CompletionStage that will be executed when both this stage and the other specified stages complete successfully. (At run time, two results are used as arguments to the specified function).
List<CompletableFuture<Double>> futures = argsList.stream()
.map(arg -> CompletableFuture.supplyAsync(
() -> doSomeLongComputation(arg)
))
.map(future -> future.thenCombine(
CompletableFuture.supplyAsync(
() -> doSomeLongComputation("test")),
(resultFromFirstFuture, resultFromSecondsFuture) -> resultFromFirstFuture * resultFromSecondsFuture
))
.collect(toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(toList());
notes: thenCombine / thenCombineAsync
If you have an Async suffix,
The second argument, BiFunction
, is passed to the thread pool and
It will be executed asynchronously as a different task.
reference Java8 In Action