As always, it's a reprint from the blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708
The name of the city is District 1 of Ho Chi Minh City.
The city, which collapsed and reconstructed overnight and became a concession for improving performance, becomes an extremely tense zone at the boundary of multithreading.
Here, the Java standard class Future is dying to keep the world in balance. This story is a daily record of the developer's battle to challenge this Future.
(Blood Blockade Battlefront)
So, in the article after a long time, I would like to write about the implementation tips that I often do about the handling of Future, which is the return value wrapper of java asynchronous processing.
The theme this time is
-** I want to handle errors even with multi-threaded and asynchronous processing ** -** I want to refresh the return value expansion of Future **
There are two.
When writing multithreaded processing in Java, I think that many people write the following processing.
and ʻExecutionException
The code looks like this. (Don't be fooled by Exception or use parallelStream.)
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("Error happened while extracting the thread result.", e);
// Do something for recover. Here, just return null.
return null;
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0)
throw new RuntimeException("Error if odd");
return num + ": Something!";
}
}
It's a fairly common code, but the get part of Future is redundant and it's dull to write. Moreover, it is very difficult to handle the error because it is not known which input caused the error.
When implementing multithreading in Java, there are roughly two ways to pass input to processing.
Callable / Runnable
as a property and pass it as an argument of the constructor when new
.However, in either case, error handling is difficult.
In case of 1, it is delicate to save the thread instance in the first place, and you have to manage the mapping between Future and Thread instance somewhere. In the case of 2, it occurred error of the determination or does not stick to any input because it can not get the argument of the original thread threw from the Future.
Therefore, I decided to take the following method to solve this problem.
The Tuple
is simply a set of values, a concept often used in programming.
Tuple is not provided as standard in Java, but there are various similar methods, [Pair] of Common Lang
(https://commons.apache.org/proper/commons-lang/javadocs/ api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) and [Triple](https://commons.apache.org/proper/commons-lang/javadocs/api-3.9/org/apache/commons /lang3/tuple/Triple.html) class, reactor
Tuple class It can be achieved by using the Pair class of JavaTuple
.
(It's not a complicated class, so you can implement it yourself.)
By using Tuple to save input in Left and Future in Right, you can perform error processing using the value of the input source in Error processing. e? Do you have a lot of properties? Is the input too heavy to cause OOM? ?? Do you want to review the design?
Now, let's use Commons Lang's Pair, which seems to be used in almost all projects. With Tuple, the above main class can be rewritten like this.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.getRight().get();
} catch (Exception e) {
log.error("Input {} was not processed correctly.", future.getLeft(), e);
// Do something for recover. Here, just return null.
return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Now you can handle errors and logging using the input values.
However, the part where Future is deployed is still redundant, and it is somewhat frustrating.
So, next, let's make this part common.
All you want to do is develop the future, so if you cut it out, it's very easy to standardize. On the other hand, I want to handle errors relative to input, so I want to design that part to be flexible.
Therefore, prepare the following Future expansion class so that the error handling part can be handled by any Function using Exception and input.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
/**
* Callback function to recover when exception such as {@link InterruptedException} or {@link
* java.util.concurrent.ExecutionException}.
*/
private final BiFunction<L, Exception, R> recoveryCallback;
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (Exception e) {
return recoveryCallback.apply(futurePair.getLeft(), e);
}
}
}
If you incorporate this into the Main class earlier, it will be as follows.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
BiFunction<Integer,Exception,String> errorHandler =
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
};
val flattener = new FutureFlattener<Integer, String>(errorHandler);
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList
.parallelStream()
.map(flattener)
.forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
However, even though I'm using the Function interface, having a function as a property internally is honestly awkward.
Don't be crap, I want to decide cool. So, let's expand it a little more.
Many other languages provide monadic classes with methods for when an Exception is thrown, such as onCatch or thenCatch.
Unfortunately, Java's Function Interface can only be a method chain that assumes the success of compose
, ʻapply, ʻand Then
.
So, let's implement onCatch by inheriting Java's Function interface.
public interface CatchableFunction<T, R> extends Function<T, R> {
/**
* by calling this method in advance of calling {@link Function#apply}, any exception thrown in
* the apply method will be handled as defined in the argument onCatch.
*
* @param onCatch callback method to handle the exception. First Type T is the input of the base
* function.
* @return fail-safe function with a callback. This method will generate a new Function instance
* instead of modifying the existing function instance.
*/
default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
return t -> {
try {
return apply(t);
} catch (Exception e) {
return onCatch.apply(t, e);
}
};
}
}
Since it is not possible to catch a Type parameter due to Java usage, it is frustrating to have to receive it as an Exception, but now I can write it quite functionally.
If this class is implemented in the FutureFlattener class, it will be as follows.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (InterruptedException | ExecutionException e) {
throw new FutureExpandException(e);
}
}
// To be caught in the then catch method.
private static class FutureExtractException extends RuntimeException {
FutureExpandException(Throwable cause) {
super(cause);
}
}
Checked exceptions have to be handled in the Lamdba expression, so they are wrapped in FutureExtractException
.
This also refreshes the Main
class.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val flattener =
new FutureFlattener<Integer, String>()
.thenCatch(
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
});
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream().map(flattener).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Nesting is reduced, function declarations are cleaner, and the source around Future expansion is cleaner.
How was it? There are places where it can be implemented more easily using Functional Java, but I was in a hurry so I implemented it myself.
When it comes to parallel processing, these days it's basic to use message queues such as kafka to create asynchronous and coarsely coupled, but that doesn't mean that you don't use multithreading.
Redundant Future deployments, on the other hand, not only increase nesting and reduce readability, but also discourage the most sensitive error handling.
This time I took the above solution, but how was it? If anyone says there is a better way, please leave it in the comments.
Well then!
Recommended Posts