The handling of Java Future is too dull, so I will try various ideas.

Preface

As always, it's a reprint from the blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708

The name of the city is District 1 of Ho Chi Minh City.
The city, which collapsed and reconstructed overnight and became a concession for improving performance, becomes an extremely tense zone at the boundary of multithreading.
Here, the Java standard class Future is dying to keep the world in balance. This story is a daily record of the developer's battle to challenge this Future.
(Blood Blockade Battlefront)

So, in the article after a long time, I would like to write about the implementation tips that I often do about the handling of Future, which is the return value wrapper of java asynchronous processing.

The theme this time is

-** I want to handle errors even with multi-threaded and asynchronous processing ** -** I want to refresh the return value expansion of Future **

There are two.

1. Standard asynchronous processing and Future handling

When writing multithreaded processing in Java, I think that many people write the following processing.

  1. Create a thread pool with the Executors class
  2. Create a subclass that implements Callable / Runnable (or use a Lambda expression)
  3. Submit the process to another thread by submit of ExecutorService
  4. Store the Future of the return value in List etc.
  5. Synchronize with get method of Future class in list when all are thrown to another thread
  6. Handle check exceptions ʻInterruptedException and ʻExecutionException

The code looks like this. (Don't be fooled by Exception or use parallelStream.)

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.get();
          } catch (Exception e) {
            log.error("Error happened while extracting the thread result.", e);
            // Do something for recover. Here, just return null.
            return null;
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0)
      throw new RuntimeException("Error if odd");
   return num + ": Something!";
  }
}

It's a fairly common code, but the get part of Future is redundant and it's dull to write. Moreover, it is very difficult to handle the error because it is not known which input caused the error.

2. First, try to handle errors properly

When implementing multithreading in Java, there are roughly two ways to pass input to processing.

  1. Give the class that implements Callable / Runnable as a property and pass it as an argument of the constructor when new.
  2. Refer to the final variable declared outside Lambda from the Lambda expression.

However, in either case, error handling is difficult.

In case of 1, it is delicate to save the thread instance in the first place, and you have to manage the mapping between Future and Thread instance somewhere. In the case of 2, it occurred error of the determination or does not stick to any input because it can not get the argument of the original thread threw from the Future.

Therefore, I decided to take the following method to solve this problem.

Manage properties and Futures together using Tuple

The Tuple is simply a set of values, a concept often used in programming.

Tuple is not provided as standard in Java, but there are various similar methods, [Pair] of Common Lang (https://commons.apache.org/proper/commons-lang/javadocs/ api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) and [Triple](https://commons.apache.org/proper/commons-lang/javadocs/api-3.9/org/apache/commons /lang3/tuple/Triple.html) class, reactor Tuple class It can be achieved by using the Pair class of JavaTuple.

(It's not a complicated class, so you can implement it yourself.)

By using Tuple to save input in Left and Future in Right, you can perform error processing using the value of the input source in Error processing. e? Do you have a lot of properties? Is the input too heavy to cause OOM? ?? Do you want to review the design?

Now, let's use Commons Lang's Pair, which seems to be used in almost all projects. With Tuple, the above main class can be rewritten like this.


@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.getRight().get();
          } catch (Exception e) {
            log.error("Input {} was not processed correctly.", future.getLeft(), e);
            // Do something for recover. Here, just return null.
            return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0) {
      throw new RuntimeException("Error if odd");
    }
   return num + ": Something!";
  }
}

Now you can handle errors and logging using the input values.

However, the part where Future is deployed is still redundant, and it is somewhat frustrating.

So, next, let's make this part common.

3. Make the future development part common

All you want to do is develop the future, so if you cut it out, it's very easy to standardize. On the other hand, I want to handle errors relative to input, so I want to design that part to be flexible.

Therefore, prepare the following Future expansion class so that the error handling part can be handled by any Function using Exception and input.

@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
  /**
   * Callback function to recover when exception such as {@link InterruptedException} or {@link
   * java.util.concurrent.ExecutionException}.
   */
  private final BiFunction<L, Exception, R> recoveryCallback;

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (Exception e) {
      return recoveryCallback.apply(futurePair.getLeft(), e);
    }
  }
}

If you incorporate this into the Main class earlier, it will be as follows.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    BiFunction<Integer,Exception,String> errorHandler =
        (in, e) -> {
          log.error("Input {} was not processed correctly.", in, e);
          return String.format("Input %s Failed in process, damn shit!! ", in);
        };
    val flattener = new FutureFlattener<Integer, String>(errorHandler);
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList
        .parallelStream()
        .map(flattener)
        .forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

However, even though I'm using the Function interface, having a function as a property internally is honestly awkward.

Don't be crap, I want to decide cool. So, let's expand it a little more.

4. Inherit Java's Function interface to add error handling

Many other languages provide monadic classes with methods for when an Exception is thrown, such as onCatch or thenCatch. Unfortunately, Java's Function Interface can only be a method chain that assumes the success of compose, ʻapply, ʻand Then.

So, let's implement onCatch by inheriting Java's Function interface.

public interface CatchableFunction<T, R> extends Function<T, R> {

  /**
   * by calling this method in advance of calling {@link Function#apply}, any exception thrown in
   * the apply method will be handled as defined in the argument onCatch.
   *
   * @param onCatch callback method to handle the exception. First Type T is the input of the base
   *     function.
   * @return fail-safe function with a callback. This method will generate a new Function instance
   *     instead of modifying the existing function instance.
   */
  default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
    return t -> {
      try {
        return apply(t);
      } catch (Exception e) {
        return onCatch.apply(t, e);
      }
    };
  }
}

Since it is not possible to catch a Type parameter due to Java usage, it is frustrating to have to receive it as an Exception, but now I can write it quite functionally.

If this class is implemented in the FutureFlattener class, it will be as follows.



@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (InterruptedException | ExecutionException e) {
      throw new FutureExpandException(e);
    }
  }

  // To be caught in the then catch method.
  private static class FutureExtractException extends RuntimeException {
    FutureExpandException(Throwable cause) {
      super(cause);
    }
  }

Checked exceptions have to be handled in the Lamdba expression, so they are wrapped in FutureExtractException. This also refreshes the Main class.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val flattener =
        new FutureFlattener<Integer, String>()
            .thenCatch(
                (in, e) -> {
                  log.error("Input {} was not processed correctly.", in, e);
                  return String.format("Input %s Failed in process, damn shit!! ", in);
                });
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList.parallelStream().map(flattener).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

Nesting is reduced, function declarations are cleaner, and the source around Future expansion is cleaner.

At the end

How was it? There are places where it can be implemented more easily using Functional Java, but I was in a hurry so I implemented it myself.

When it comes to parallel processing, these days it's basic to use message queues such as kafka to create asynchronous and coarsely coupled, but that doesn't mean that you don't use multithreading.

Redundant Future deployments, on the other hand, not only increase nesting and reduce readability, but also discourage the most sensitive error handling.

This time I took the above solution, but how was it? If anyone says there is a better way, please leave it in the comments.

Well then!

Recommended Posts

The handling of Java Future is too dull, so I will try various ideas.
Try the free version of Progate [Java I]
I passed the Java test level 2 so I will leave a note
I passed the Oracle Java Bronze, so I summarized the outline of the exam.
[day: 5] I summarized the basics of Java
I stumbled on the Java version in Android Studio, so I will summarize it
[Java] Handling of JavaBeans in the method chain
The order of Java method modifiers is fixed
Try the free version of Progate [Java II]
I touched on the new features of Java 15
Apple Silicon compatible version of Docker Desktop (Preview) has been released to the public, so I will try it
[Java] [javaSliver] The interface and abstract classes are complicated, so I will summarize them.
Is drainTo of LinkedBlockingQueue safe? I followed the source
The comparison of enums is ==, and equals is good [Java]
Which is better, Kotlin or Java in the future?
Organizing the current state of Java and considering the future
I summarized the types and basics of Java exceptions
I want to find out which version of java the jar file I have is available
I finished watching The Rose of Versailles, so I tried to reproduce the ending song in Java
I noticed that I am developing Java FW in the mono repo, so I will consider the operation.