Der Umgang mit Java Future ist zu langweilig, daher werde ich verschiedene Ideen ausprobieren.

Vorwort

Wie immer ist es ein Nachdruck aus dem Blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708

Der Name der Stadt ist Ho Chi Minh District 1.
Die Stadt, die über Nacht zusammenbrach und rekonstruiert wurde und zu einer Welt der Leistungsverbesserung wurde, wird zu einer extrem angespannten Zone am Grenzpunkt zum Multithreading.
Hier will die Java-Standardklasse Future das Gleichgewicht der Welt halten. Diese Geschichte ist eine Aufzeichnung des Kampfes und des täglichen Lebens des Entwicklers, um diese Zukunft herauszufordern.
(Blutiger Frontstil)

Daher möchte ich in dem Artikel nach langer Zeit über die Implementierungstipps schreiben, die ich häufig zum Umgang mit Future mache, einem Rückgabewert-Wrapper für die asynchrone Java-Verarbeitung.

Das Thema ist diesmal

Es gibt zwei.

1. Standard asynchrone Verarbeitung und zukünftige Behandlung

Beim Schreiben von Multithread-Verarbeitung in Java denke ich, dass viele Leute die folgende Verarbeitung schreiben.

  1. Erstellen Sie einen Thread-Pool mit der Executors-Klasse
  2. Erstellen Sie eine Unterklasse, die Callable / Runnable implementiert (oder verwenden Sie einen Lambda-Ausdruck).
  3. Senden Sie die Verarbeitung an einen anderen Thread, indem Sie ExecutorService senden
  4. Speichern Sie die Zukunft des Rückgabewerts in Liste usw.
  5. Synchronisieren Sie mit der get-Methode der Future-Klasse in der Liste, wenn alle in einen anderen Thread geworfen werden
  6. Behandeln Sie die Prüfausnahmen InterruptedException und ExecutionException

Der Code sieht so aus. (Lassen Sie sich nicht von Exception täuschen oder verwenden Sie parallelStream.)

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.get();
          } catch (Exception e) {
            log.error("Error happened while extracting the thread result.", e);
            // Do something for recover. Here, just return null.
            return null;
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0)
      throw new RuntimeException("Error if odd");
   return num + ": Something!";
  }
}

Es ist ein ziemlich verbreiteter Code, aber es ist langweilig, den get-Teil von Future zu schreiben, weil er redundant ist. Darüber hinaus ist es sehr schwierig, den Fehler zu behandeln, da nicht bekannt ist, welche Eingabe den Fehler verursacht hat.

2. Versuchen Sie zunächst, den Fehler richtig zu behandeln

Bei der Implementierung von Multithreading in Java ist die Methode zum Übergeben von Eingaben an die Verarbeitung ungefähr eine der folgenden beiden Methoden.

  1. Geben Sie die Klasse, die "Callable / Runnable" implementiert, als Eigenschaft an und übergeben Sie sie als Argument des Konstruktors, wenn "new".
  2. Beziehen Sie sich auf die letzte Variable, die außerhalb von Lambda aus dem Lambda-Ausdruck deklariert wurde.

In beiden Fällen ist die Fehlerbehandlung jedoch schwierig.

Im Fall von 1 ist es schwierig, die Thread-Instanz zuerst zu speichern, und Sie müssen die Zuordnung zwischen Future und Thread-Instanz irgendwo verwalten. Im Fall von 2 ist ein Fehler bei der Bestimmung aufgetreten oder es bleibt keine Eingabe erhalten, da das Argument des ursprünglichen Threads aus der Zukunft nicht abgerufen werden kann.

Daher habe ich mich für die folgende Methode entschieden, um dieses Problem zu lösen.

Verwalten Sie Eigenschaften und Zukunft gemeinsam mit Tuple

Einfach ausgedrückt ist "Tupel" eine Reihe von Werten, ein Konzept, das häufig in der Programmierung verwendet wird.

Tupel wird in Java nicht standardmäßig bereitgestellt, es gibt jedoch verschiedene ähnliche Methoden, [Paar] von Common Lang (https://commons.apache.org/proper/commons-lang/javadocs/). api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) und Triple Klasse /lang3/tuple/Triple.html), Klasse reactor Tuple Dies kann mithilfe der Klasse Pair von JavaTuple erreicht werden.

(Es ist keine komplizierte Klasse, daher können Sie sie selbst implementieren.)

Wenn Sie Tupel verwenden, um Eingaben in Links und Zukunft in Rechts zu speichern, können Sie die Fehlerverarbeitung mit dem Wert der Eingabequelle in der Fehlerverarbeitung durchführen. e? Haben Sie viele Eigenschaften? Ist der Eingang zu schwer, um OOM zu verursachen? ?? Möchten Sie das Design überprüfen?

Verwenden wir nun Commons Langs Paar, das in fast allen Projekten verwendet zu werden scheint. Mit Tuple kann die obige Hauptklasse so umgeschrieben werden.


@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.getRight().get();
          } catch (Exception e) {
            log.error("Input {} was not processed correctly.", future.getLeft(), e);
            // Do something for recover. Here, just return null.
            return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0) {
      throw new RuntimeException("Error if odd");
    }
   return num + ": Something!";
  }
}

Jetzt können Sie Fehler und Protokollierung mit den Eingabewerten behandeln.

Der Teil, in dem Future bereitgestellt wird, ist jedoch immer noch redundant und etwas frustrierend.

Lassen Sie uns als nächstes diesen Teil gemeinsam machen.

3. Machen Sie den zukünftigen Entwicklungsteil gemeinsam

Alles, was Sie tun möchten, ist, die Zukunft zu entwickeln. Wenn Sie sie also ausschneiden, ist es sehr einfach, sie zu standardisieren. Auf der anderen Seite möchte ich Fehler in Bezug auf die Eingabe behandeln, also möchte ich dieses Teil so gestalten, dass es flexibel ist.

Bereiten Sie daher die folgende zukünftige Erweiterungsklasse vor, damit der Fehlerbehandlungsteil von jeder Funktion mithilfe von Ausnahme und Eingabe verarbeitet werden kann.

@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
  /**
   * Callback function to recover when exception such as {@link InterruptedException} or {@link
   * java.util.concurrent.ExecutionException}.
   */
  private final BiFunction<L, Exception, R> recoveryCallback;

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (Exception e) {
      return recoveryCallback.apply(futurePair.getLeft(), e);
    }
  }
}

Wenn Sie dies früher in die Hauptklasse aufnehmen, sieht es wie folgt aus.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    BiFunction<Integer,Exception,String> errorHandler =
        (in, e) -> {
          log.error("Input {} was not processed correctly.", in, e);
          return String.format("Input %s Failed in process, damn shit!! ", in);
        };
    val flattener = new FutureFlattener<Integer, String>(errorHandler);
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList
        .parallelStream()
        .map(flattener)
        .forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

Um ehrlich zu sein, ist es jedoch umständlich, eine Funktion intern als Eigenschaft zu haben, obwohl Sie die Funktionsschnittstelle verwenden.

Sei kein Mist, ich möchte mich cool entscheiden. Lassen Sie es uns also etwas weiter ausbauen.

4. Erben Sie die Java-Funktionsschnittstelle, um die Fehlerbehandlung hinzuzufügen

Viele andere Sprachen bieten monadischen Klassen Methoden für das Auslösen einer Ausnahme, z. B. onCatch oder thenCatch. Leider kann die Funktionsschnittstelle von Java nur eine Methodenkette sein, die den Erfolg von "compose", "apply" und "andThen" voraussetzt.

Implementieren wir also onCatch, indem wir die Funktionsschnittstelle von Java erben.

public interface CatchableFunction<T, R> extends Function<T, R> {

  /**
   * by calling this method in advance of calling {@link Function#apply}, any exception thrown in
   * the apply method will be handled as defined in the argument onCatch.
   *
   * @param onCatch callback method to handle the exception. First Type T is the input of the base
   *     function.
   * @return fail-safe function with a callback. This method will generate a new Function instance
   *     instead of modifying the existing function instance.
   */
  default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
    return t -> {
      try {
        return apply(t);
      } catch (Exception e) {
        return onCatch.apply(t, e);
      }
    };
  }
}

Da es aufgrund der Verwendung von Java nicht möglich ist, einen Type-Parameter abzufangen, ist es frustrierend, ihn als Ausnahme empfangen zu müssen, aber jetzt kann er recht funktional geschrieben werden.

Wenn Sie diese Klasse in der FutureFlattener-Klasse implementieren, sieht sie wie folgt aus.



@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (InterruptedException | ExecutionException e) {
      throw new FutureExpandException(e);
    }
  }

  // To be caught in the then catch method.
  private static class FutureExtractException extends RuntimeException {
    FutureExpandException(Throwable cause) {
      super(cause);
    }
  }

Überprüfte Ausnahmen müssen im Lamdba-Ausdruck behandelt werden, damit sie in "FutureExtractException" eingeschlossen werden. Dies aktualisiert auch die Hauptklasse.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val flattener =
        new FutureFlattener<Integer, String>()
            .thenCatch(
                (in, e) -> {
                  log.error("Input {} was not processed correctly.", in, e);
                  return String.format("Input %s Failed in process, damn shit!! ", in);
                });
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList.parallelStream().map(flattener).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

Die Verschachtelung wurde reduziert, Funktionsdeklarationen wurden aktualisiert und die Quelle für zukünftige Erweiterungen wurde aktualisiert.

Am Ende

Wie war es? Es gibt Stellen, an denen es mit Functional Java einfacher implementiert werden kann, aber ich hatte es eilig und habe es selbst implementiert.

Wenn es um die parallele Verarbeitung geht, ist es heutzutage einfach, Nachrichtenwarteschlangen wie kafka zu verwenden, um asynchron und lose gekoppelt zu erstellen. Dies bedeutet jedoch nicht, dass Sie kein Multithreading verwenden.

Auf der anderen Seite erhöhen redundante zukünftige Bereitstellungen nicht nur die Verschachtelung und verringern die Lesbarkeit, sondern entmutigen auch die empfindlichste Fehlerbehandlung.

Diesmal habe ich die obige Lösung gewählt, aber wie war es? Wenn jemand sagt, dass es einen besseren Weg gibt, hinterlassen Sie ihn bitte in den Kommentaren.

Na dann!

Recommended Posts

Der Umgang mit Java Future ist zu langweilig, daher werde ich verschiedene Ideen ausprobieren.
Probieren Sie Progate Free Edition [Java I]
Ich habe den Java-Test Level 2 bestanden und werde eine Notiz hinterlassen
Seit ich Oracle Java Bronze bestanden habe, habe ich die Umrisse des Tests zusammengefasst.
[Tag: 5] Ich habe die Grundlagen von Java zusammengefasst
Ich bin auf die Java-Version in Android Studio gestoßen, daher werde ich sie zusammenfassen
[Java] Behandlung von Java Beans in der Methodenkette
Die Reihenfolge der Java-Methodenmodifikatoren ist festgelegt
Probieren Sie Progate Free Edition [Java II]
[Java] [javaSliver] Die Schnittstelle und die abstrakten Klassen sind kompliziert, daher werde ich sie zusammenfassen.
Ist drainTo von LinkedBlockingQueue sicher? Ich bin der Quelle gefolgt
Der Vergleich von enum ist == und gleich ist gut [Java]
Was ist besser, Kotlin oder zukünftiges Java?
Den aktuellen Status von Java organisieren und die Zukunft betrachten
Ich habe die Typen und Grundlagen von Java-Ausnahmen zusammengefasst
Ich möchte herausfinden, welche Java-Version die JAR-Datei hat, die ich habe
Ich habe mir die Rosen von Versailles angesehen und versucht, das Schlusslied in Java zu reproduzieren
Ich habe festgestellt, dass ich eine Java-FW im Mono-Repo entwickle, daher werde ich die Operation in Betracht ziehen.