Wie immer ist es ein Nachdruck aus dem Blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708
Der Name der Stadt ist Ho Chi Minh District 1.
Die Stadt, die über Nacht zusammenbrach und rekonstruiert wurde und zu einer Welt der Leistungsverbesserung wurde, wird zu einer extrem angespannten Zone am Grenzpunkt zum Multithreading.
Hier will die Java-Standardklasse Future das Gleichgewicht der Welt halten. Diese Geschichte ist eine Aufzeichnung des Kampfes und des täglichen Lebens des Entwicklers, um diese Zukunft herauszufordern.
(Blutiger Frontstil)
Daher möchte ich in dem Artikel nach langer Zeit über die Implementierungstipps schreiben, die ich häufig zum Umgang mit Future mache, einem Rückgabewert-Wrapper für die asynchrone Java-Verarbeitung.
Das Thema ist diesmal
Es gibt zwei.
Beim Schreiben von Multithread-Verarbeitung in Java denke ich, dass viele Leute die folgende Verarbeitung schreiben.
InterruptedException
und ExecutionException
Der Code sieht so aus. (Lassen Sie sich nicht von Exception täuschen oder verwenden Sie parallelStream.)
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("Error happened while extracting the thread result.", e);
// Do something for recover. Here, just return null.
return null;
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0)
throw new RuntimeException("Error if odd");
return num + ": Something!";
}
}
Es ist ein ziemlich verbreiteter Code, aber es ist langweilig, den get-Teil von Future zu schreiben, weil er redundant ist. Darüber hinaus ist es sehr schwierig, den Fehler zu behandeln, da nicht bekannt ist, welche Eingabe den Fehler verursacht hat.
Bei der Implementierung von Multithreading in Java ist die Methode zum Übergeben von Eingaben an die Verarbeitung ungefähr eine der folgenden beiden Methoden.
In beiden Fällen ist die Fehlerbehandlung jedoch schwierig.
Im Fall von 1 ist es schwierig, die Thread-Instanz zuerst zu speichern, und Sie müssen die Zuordnung zwischen Future und Thread-Instanz irgendwo verwalten. Im Fall von 2 ist ein Fehler bei der Bestimmung aufgetreten oder es bleibt keine Eingabe erhalten, da das Argument des ursprünglichen Threads aus der Zukunft nicht abgerufen werden kann.
Daher habe ich mich für die folgende Methode entschieden, um dieses Problem zu lösen.
Einfach ausgedrückt ist "Tupel" eine Reihe von Werten, ein Konzept, das häufig in der Programmierung verwendet wird.
Tupel wird in Java nicht standardmäßig bereitgestellt, es gibt jedoch verschiedene ähnliche Methoden, [Paar] von Common Lang
(https://commons.apache.org/proper/commons-lang/javadocs/). api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) und Triple Klasse /lang3/tuple/Triple.html), Klasse reactor
Tuple Dies kann mithilfe der Klasse Pair von JavaTuple
erreicht werden.
(Es ist keine komplizierte Klasse, daher können Sie sie selbst implementieren.)
Wenn Sie Tupel verwenden, um Eingaben in Links und Zukunft in Rechts zu speichern, können Sie die Fehlerverarbeitung mit dem Wert der Eingabequelle in der Fehlerverarbeitung durchführen. e? Haben Sie viele Eigenschaften? Ist der Eingang zu schwer, um OOM zu verursachen? ?? Möchten Sie das Design überprüfen?
Verwenden wir nun Commons Langs Paar, das in fast allen Projekten verwendet zu werden scheint. Mit Tuple kann die obige Hauptklasse so umgeschrieben werden.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.getRight().get();
} catch (Exception e) {
log.error("Input {} was not processed correctly.", future.getLeft(), e);
// Do something for recover. Here, just return null.
return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Jetzt können Sie Fehler und Protokollierung mit den Eingabewerten behandeln.
Der Teil, in dem Future bereitgestellt wird, ist jedoch immer noch redundant und etwas frustrierend.
Lassen Sie uns als nächstes diesen Teil gemeinsam machen.
Alles, was Sie tun möchten, ist, die Zukunft zu entwickeln. Wenn Sie sie also ausschneiden, ist es sehr einfach, sie zu standardisieren. Auf der anderen Seite möchte ich Fehler in Bezug auf die Eingabe behandeln, also möchte ich dieses Teil so gestalten, dass es flexibel ist.
Bereiten Sie daher die folgende zukünftige Erweiterungsklasse vor, damit der Fehlerbehandlungsteil von jeder Funktion mithilfe von Ausnahme und Eingabe verarbeitet werden kann.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
/**
* Callback function to recover when exception such as {@link InterruptedException} or {@link
* java.util.concurrent.ExecutionException}.
*/
private final BiFunction<L, Exception, R> recoveryCallback;
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (Exception e) {
return recoveryCallback.apply(futurePair.getLeft(), e);
}
}
}
Wenn Sie dies früher in die Hauptklasse aufnehmen, sieht es wie folgt aus.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
BiFunction<Integer,Exception,String> errorHandler =
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
};
val flattener = new FutureFlattener<Integer, String>(errorHandler);
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList
.parallelStream()
.map(flattener)
.forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Um ehrlich zu sein, ist es jedoch umständlich, eine Funktion intern als Eigenschaft zu haben, obwohl Sie die Funktionsschnittstelle verwenden.
Sei kein Mist, ich möchte mich cool entscheiden. Lassen Sie es uns also etwas weiter ausbauen.
Viele andere Sprachen bieten monadischen Klassen Methoden für das Auslösen einer Ausnahme, z. B. onCatch oder thenCatch. Leider kann die Funktionsschnittstelle von Java nur eine Methodenkette sein, die den Erfolg von "compose", "apply" und "andThen" voraussetzt.
Implementieren wir also onCatch, indem wir die Funktionsschnittstelle von Java erben.
public interface CatchableFunction<T, R> extends Function<T, R> {
/**
* by calling this method in advance of calling {@link Function#apply}, any exception thrown in
* the apply method will be handled as defined in the argument onCatch.
*
* @param onCatch callback method to handle the exception. First Type T is the input of the base
* function.
* @return fail-safe function with a callback. This method will generate a new Function instance
* instead of modifying the existing function instance.
*/
default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
return t -> {
try {
return apply(t);
} catch (Exception e) {
return onCatch.apply(t, e);
}
};
}
}
Da es aufgrund der Verwendung von Java nicht möglich ist, einen Type-Parameter abzufangen, ist es frustrierend, ihn als Ausnahme empfangen zu müssen, aber jetzt kann er recht funktional geschrieben werden.
Wenn Sie diese Klasse in der FutureFlattener-Klasse implementieren, sieht sie wie folgt aus.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (InterruptedException | ExecutionException e) {
throw new FutureExpandException(e);
}
}
// To be caught in the then catch method.
private static class FutureExtractException extends RuntimeException {
FutureExpandException(Throwable cause) {
super(cause);
}
}
Überprüfte Ausnahmen müssen im Lamdba-Ausdruck behandelt werden, damit sie in "FutureExtractException" eingeschlossen werden. Dies aktualisiert auch die Hauptklasse.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val flattener =
new FutureFlattener<Integer, String>()
.thenCatch(
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
});
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream().map(flattener).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Die Verschachtelung wurde reduziert, Funktionsdeklarationen wurden aktualisiert und die Quelle für zukünftige Erweiterungen wurde aktualisiert.
Wie war es? Es gibt Stellen, an denen es mit Functional Java einfacher implementiert werden kann, aber ich hatte es eilig und habe es selbst implementiert.
Wenn es um die parallele Verarbeitung geht, ist es heutzutage einfach, Nachrichtenwarteschlangen wie kafka zu verwenden, um asynchron und lose gekoppelt zu erstellen. Dies bedeutet jedoch nicht, dass Sie kein Multithreading verwenden.
Auf der anderen Seite erhöhen redundante zukünftige Bereitstellungen nicht nur die Verschachtelung und verringern die Lesbarkeit, sondern entmutigen auch die empfindlichste Fehlerbehandlung.
Diesmal habe ich die obige Lösung gewählt, aber wie war es? Wenn jemand sagt, dass es einen besseren Weg gibt, hinterlassen Sie ihn bitte in den Kommentaren.
Na dann!
Recommended Posts