Comme toujours, c'est une réimpression du blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708
Le nom de la ville est Ho Chi Minh District 1.
La ville, qui s'est effondrée et reconstruite du jour au lendemain et est devenue un monde d'amélioration des performances, devient une zone extrêmement tendue à la frontière face au multi-threading.
Ici, la classe standard Java Future meurt d'envie de garder l'équilibre du monde. Cette histoire est un récit de la bataille et de la vie quotidienne du développeur pour défier cet avenir.
(Style avant sanglant)
Donc, dans l'article après un long moment, je voudrais écrire sur les conseils d'implémentation que je fais souvent sur la gestion de Future, qui est un wrapper de valeur de retour pour le traitement asynchrone java.
Le thème cette fois est
Il y en a deux.
Lors de l'écriture d'un traitement multi-thread en Java, je pense que beaucoup de gens écrivent le traitement suivant.
et ʻExecutionException
Le code ressemble à ceci. (Ne vous laissez pas berner par Exception ou utilisez parallelStream.)
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("Error happened while extracting the thread result.", e);
// Do something for recover. Here, just return null.
return null;
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0)
throw new RuntimeException("Error if odd");
return num + ": Something!";
}
}
C'est un code assez courant, mais il est ennuyeux d'écrire la partie get de Future car il est redondant. De plus, il est très difficile de gérer l'erreur car on ne sait pas quelle entrée a causé l'erreur.
Lors de l'implémentation du multithreading en Java, la méthode de transmission de l'entrée au traitement est à peu près l'une des deux méthodes suivantes.
Callable / Runnable
comme propriété et passez-la comme argument du constructeur quand new
.Cependant, dans les deux cas, la gestion des erreurs est difficile.
Dans le cas de 1, il est délicat de sauvegarder l'instance de thread en premier lieu, et vous devez gérer le mappage entre l'instance Future et Thread quelque part. Dans le cas de 2, il s'est produit une erreur de détermination ou ne s'en tient à aucune entrée car il ne peut pas obtenir l'argument du thread d'origine jeté du Futur.
Par conséquent, j'ai décidé de prendre la méthode suivante pour résoudre ce problème.
Le «Tuple» est simplement un ensemble de valeurs, un concept souvent utilisé en programmation.
Tuple n'est pas fourni en standard en Java, mais il existe diverses méthodes similaires, [Pair] of Common Lang
(https://commons.apache.org/proper/commons-lang/javadocs/ api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) et [Triple](https://commons.apache.org/proper/commons-lang/javadocs/api-3.9/org/apache/commons /lang3/tuple/Triple.html) classe, réacteur
Tuple classe Ceci peut être réalisé en utilisant la classe Pair de JavaTuple
.
(Ce n'est pas une classe compliquée, vous pouvez donc l'implémenter vous-même.)
En utilisant Tuple pour enregistrer l'entrée dans Left et Future in Right, vous pouvez effectuer le traitement des erreurs en utilisant la valeur de la source d'entrée dans le traitement des erreurs. e? Avez-vous beaucoup de propriétés? L'entrée est-elle trop lourde pour provoquer un MOO? ?? Voulez-vous revoir le design?
Maintenant, utilisons Commons Lang's Pair, qui semble être utilisé dans presque tous les projets. Avec Tuple, la classe principale ci-dessus peut être réécrite comme ceci.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.getRight().get();
} catch (Exception e) {
log.error("Input {} was not processed correctly.", future.getLeft(), e);
// Do something for recover. Here, just return null.
return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Vous pouvez désormais gérer les erreurs et la journalisation à l'aide des valeurs d'entrée.
Cependant, la partie où Future est déployé est toujours redondante et elle est quelque peu frustrante.
Alors, maintenant, rendons cette partie commune.
Tout ce que vous voulez faire est de développer l'avenir, donc si vous le supprimez, il est très facile de normaliser. D'un autre côté, je veux gérer les erreurs relatives à l'entrée, donc je veux concevoir cette pièce pour qu'elle soit flexible.
Par conséquent, préparez la classe d'extension Future suivante afin que la partie de gestion des erreurs puisse être traitée par n'importe quelle fonction utilisant Exception et input.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
/**
* Callback function to recover when exception such as {@link InterruptedException} or {@link
* java.util.concurrent.ExecutionException}.
*/
private final BiFunction<L, Exception, R> recoveryCallback;
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (Exception e) {
return recoveryCallback.apply(futurePair.getLeft(), e);
}
}
}
Si vous intégrez cela dans la classe Main plus tôt, ce sera comme suit.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
BiFunction<Integer,Exception,String> errorHandler =
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
};
val flattener = new FutureFlattener<Integer, String>(errorHandler);
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList
.parallelStream()
.map(flattener)
.forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
Cependant, pour être honnête, avoir une fonction en tant que propriété en interne même si vous utilisez l'interface Function est gênant.
Ne sois pas merdique, je veux décider cool. Alors, développons-le un peu plus.
De nombreux autres langages fournissent des classes monadiques avec des méthodes pour lorsqu'une exception est levée, comme onCatch ou thenCatch.
Malheureusement, l'interface de fonction de Java ne peut être qu'une chaîne de méthodes qui suppose le succès de compose
, ʻapply, ʻand Then
.
Alors, implémentons onCatch en héritant de l'interface Function de Java.
public interface CatchableFunction<T, R> extends Function<T, R> {
/**
* by calling this method in advance of calling {@link Function#apply}, any exception thrown in
* the apply method will be handled as defined in the argument onCatch.
*
* @param onCatch callback method to handle the exception. First Type T is the input of the base
* function.
* @return fail-safe function with a callback. This method will generate a new Function instance
* instead of modifying the existing function instance.
*/
default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
return t -> {
try {
return apply(t);
} catch (Exception e) {
return onCatch.apply(t, e);
}
};
}
}
Puisqu'il n'est pas possible d'attraper un paramètre Type en raison de l'utilisation de Java, il est frustrant de devoir le recevoir comme une exception, mais maintenant il peut être écrit de manière assez fonctionnelle.
Si vous implémentez cette classe dans la classe FutureFlattener, ce sera comme suit.
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (InterruptedException | ExecutionException e) {
throw new FutureExpandException(e);
}
}
// To be caught in the then catch method.
private static class FutureExtractException extends RuntimeException {
FutureExpandException(Throwable cause) {
super(cause);
}
}
Les exceptions vérifiées doivent être gérées dans l'expression Lamdba, elles sont donc encapsulées dans FutureExtractException
.
Cela actualise également la classe Main
.
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val flattener =
new FutureFlattener<Integer, String>()
.thenCatch(
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
});
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream().map(flattener).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
L'imbrication a été réduite, les déclarations de fonctions ont été actualisées et la source autour de l'extension Future a été actualisée.
Comment était-ce? Il y a des endroits où il peut être implémenté plus facilement en utilisant Functional Java, mais j'étais pressé alors je l'ai implémenté moi-même.
En ce qui concerne le traitement parallèle, de nos jours, il est basique d'utiliser des files d'attente de messages telles que kafka pour créer des fichiers asynchrones et faiblement couplés, mais cela ne signifie pas que vous n'utilisez pas le multithreading.
D'autre part, les déploiements Future redondants non seulement augmentent l'imbrication et réduisent la lisibilité, mais découragent également la gestion des erreurs les plus sensibles.
Cette fois, j'ai pris la solution ci-dessus, mais comment était-ce? Si quelqu'un dit qu'il existe un meilleur moyen, veuillez le laisser dans les commentaires.
Eh bien!
Recommended Posts