La gestion de Java Future est trop terne, je vais donc essayer diverses idées.

Préface

Comme toujours, c'est une réimpression du blog. https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708

Le nom de la ville est Ho Chi Minh District 1.
La ville, qui s'est effondrée et reconstruite du jour au lendemain et est devenue un monde d'amélioration des performances, devient une zone extrêmement tendue à la frontière face au multi-threading.
Ici, la classe standard Java Future meurt d'envie de garder l'équilibre du monde. Cette histoire est un récit de la bataille et de la vie quotidienne du développeur pour défier cet avenir.
(Style avant sanglant)

Donc, dans l'article après un long moment, je voudrais écrire sur les conseils d'implémentation que je fais souvent sur la gestion de Future, qui est un wrapper de valeur de retour pour le traitement asynchrone java.

Le thème cette fois est

Il y en a deux.

1. Traitement asynchrone standard et gestion future

Lors de l'écriture d'un traitement multi-thread en Java, je pense que beaucoup de gens écrivent le traitement suivant.

  1. Créez un pool de threads avec la classe Executors
  2. Créez une sous-classe qui implémente Callable / Runnable (ou utilisez une expression Lambda)
  3. Soumettez le traitement à un autre thread en soumettant ExecutorService
  4. Stockez l'avenir de la valeur de retour dans List, etc.
  5. Synchronisez avec la méthode get de la classe Future dans la liste lorsque tous sont lancés vers un autre thread
  6. Gérer les exceptions de vérification ʻInterruptedException et ʻExecutionException

Le code ressemble à ceci. (Ne vous laissez pas berner par Exception ou utilisez parallelStream.)

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.get();
          } catch (Exception e) {
            log.error("Error happened while extracting the thread result.", e);
            // Do something for recover. Here, just return null.
            return null;
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0)
      throw new RuntimeException("Error if odd");
   return num + ": Something!";
  }
}

C'est un code assez courant, mais il est ennuyeux d'écrire la partie get de Future car il est redondant. De plus, il est très difficile de gérer l'erreur car on ne sait pas quelle entrée a causé l'erreur.

2. Tout d'abord, essayez de gérer correctement l'erreur

Lors de l'implémentation du multithreading en Java, la méthode de transmission de l'entrée au traitement est à peu près l'une des deux méthodes suivantes.

  1. Donnez la classe qui implémente Callable / Runnable comme propriété et passez-la comme argument du constructeur quand new.
  2. Faites référence à la variable finale déclarée en dehors de Lambda à partir de l'expression Lambda.

Cependant, dans les deux cas, la gestion des erreurs est difficile.

Dans le cas de 1, il est délicat de sauvegarder l'instance de thread en premier lieu, et vous devez gérer le mappage entre l'instance Future et Thread quelque part. Dans le cas de 2, il s'est produit une erreur de détermination ou ne s'en tient à aucune entrée car il ne peut pas obtenir l'argument du thread d'origine jeté du Futur.

Par conséquent, j'ai décidé de prendre la méthode suivante pour résoudre ce problème.

Gérez les propriétés et l'avenir ensemble à l'aide de Tuple

Le «Tuple» est simplement un ensemble de valeurs, un concept souvent utilisé en programmation.

Tuple n'est pas fourni en standard en Java, mais il existe diverses méthodes similaires, [Pair] of Common Lang (https://commons.apache.org/proper/commons-lang/javadocs/ api-3.9 / org / apache / commons / lang3 / tuple / Pair.html) et [Triple](https://commons.apache.org/proper/commons-lang/javadocs/api-3.9/org/apache/commons /lang3/tuple/Triple.html) classe, réacteur Tuple classe Ceci peut être réalisé en utilisant la classe Pair de JavaTuple.

(Ce n'est pas une classe compliquée, vous pouvez donc l'implémenter vous-même.)

En utilisant Tuple pour enregistrer l'entrée dans Left et Future in Right, vous pouvez effectuer le traitement des erreurs en utilisant la valeur de la source d'entrée dans le traitement des erreurs. e? Avez-vous beaucoup de propriétés? L'entrée est-elle trop lourde pour provoquer un MOO? ?? Voulez-vous revoir le design?

Maintenant, utilisons Commons Lang's Pair, qui semble être utilisé dans presque tous les projets. Avec Tuple, la classe principale ci-dessus peut être réécrite comme ceci.


@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.getRight().get();
          } catch (Exception e) {
            log.error("Input {} was not processed correctly.", future.getLeft(), e);
            // Do something for recover. Here, just return null.
            return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0) {
      throw new RuntimeException("Error if odd");
    }
   return num + ": Something!";
  }
}

Vous pouvez désormais gérer les erreurs et la journalisation à l'aide des valeurs d'entrée.

Cependant, la partie où Future est déployé est toujours redondante et elle est quelque peu frustrante.

Alors, maintenant, rendons cette partie commune.

3. Rendre le développement futur commun

Tout ce que vous voulez faire est de développer l'avenir, donc si vous le supprimez, il est très facile de normaliser. D'un autre côté, je veux gérer les erreurs relatives à l'entrée, donc je veux concevoir cette pièce pour qu'elle soit flexible.

Par conséquent, préparez la classe d'extension Future suivante afin que la partie de gestion des erreurs puisse être traitée par n'importe quelle fonction utilisant Exception et input.

@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
  /**
   * Callback function to recover when exception such as {@link InterruptedException} or {@link
   * java.util.concurrent.ExecutionException}.
   */
  private final BiFunction<L, Exception, R> recoveryCallback;

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (Exception e) {
      return recoveryCallback.apply(futurePair.getLeft(), e);
    }
  }
}

Si vous intégrez cela dans la classe Main plus tôt, ce sera comme suit.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    BiFunction<Integer,Exception,String> errorHandler =
        (in, e) -> {
          log.error("Input {} was not processed correctly.", in, e);
          return String.format("Input %s Failed in process, damn shit!! ", in);
        };
    val flattener = new FutureFlattener<Integer, String>(errorHandler);
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList
        .parallelStream()
        .map(flattener)
        .forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

Cependant, pour être honnête, avoir une fonction en tant que propriété en interne même si vous utilisez l'interface Function est gênant.

Ne sois pas merdique, je veux décider cool. Alors, développons-le un peu plus.

4. Hériter de l'interface Function de Java pour ajouter la gestion des erreurs

De nombreux autres langages fournissent des classes monadiques avec des méthodes pour lorsqu'une exception est levée, comme onCatch ou thenCatch. Malheureusement, l'interface de fonction de Java ne peut être qu'une chaîne de méthodes qui suppose le succès de compose, ʻapply, ʻand Then.

Alors, implémentons onCatch en héritant de l'interface Function de Java.

public interface CatchableFunction<T, R> extends Function<T, R> {

  /**
   * by calling this method in advance of calling {@link Function#apply}, any exception thrown in
   * the apply method will be handled as defined in the argument onCatch.
   *
   * @param onCatch callback method to handle the exception. First Type T is the input of the base
   *     function.
   * @return fail-safe function with a callback. This method will generate a new Function instance
   *     instead of modifying the existing function instance.
   */
  default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
    return t -> {
      try {
        return apply(t);
      } catch (Exception e) {
        return onCatch.apply(t, e);
      }
    };
  }
}

Puisqu'il n'est pas possible d'attraper un paramètre Type en raison de l'utilisation de Java, il est frustrant de devoir le recevoir comme une exception, mais maintenant il peut être écrit de manière assez fonctionnelle.

Si vous implémentez cette classe dans la classe FutureFlattener, ce sera comme suit.



@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (InterruptedException | ExecutionException e) {
      throw new FutureExpandException(e);
    }
  }

  // To be caught in the then catch method.
  private static class FutureExtractException extends RuntimeException {
    FutureExpandException(Throwable cause) {
      super(cause);
    }
  }

Les exceptions vérifiées doivent être gérées dans l'expression Lamdba, elles sont donc encapsulées dans FutureExtractException. Cela actualise également la classe Main.

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val flattener =
        new FutureFlattener<Integer, String>()
            .thenCatch(
                (in, e) -> {
                  log.error("Input {} was not processed correctly.", in, e);
                  return String.format("Input %s Failed in process, damn shit!! ", in);
                });
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList.parallelStream().map(flattener).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

L'imbrication a été réduite, les déclarations de fonctions ont été actualisées et la source autour de l'extension Future a été actualisée.

À la fin

Comment était-ce? Il y a des endroits où il peut être implémenté plus facilement en utilisant Functional Java, mais j'étais pressé alors je l'ai implémenté moi-même.

En ce qui concerne le traitement parallèle, de nos jours, il est basique d'utiliser des files d'attente de messages telles que kafka pour créer des fichiers asynchrones et faiblement couplés, mais cela ne signifie pas que vous n'utilisez pas le multithreading.

D'autre part, les déploiements Future redondants non seulement augmentent l'imbrication et réduisent la lisibilité, mais découragent également la gestion des erreurs les plus sensibles.

Cette fois, j'ai pris la solution ci-dessus, mais comment était-ce? Si quelqu'un dit qu'il existe un meilleur moyen, veuillez le laisser dans les commentaires.

Eh bien!

Recommended Posts

La gestion de Java Future est trop terne, je vais donc essayer diverses idées.
Essayez Progate Free Edition [Java I]
J'ai réussi le test Java niveau 2, je vais donc laisser une note
Depuis que j'ai réussi l'Oracle Java Bronze, j'ai résumé les grandes lignes du test.
[jour: 5] J'ai résumé les bases de Java
Je suis tombé sur la version Java dans Android Studio, je vais donc le résumer
[Java] Gestion des Java Beans dans la chaîne de méthodes
L'ordre des modificateurs de méthode Java est fixe
Essayez Progate Free Edition [Java II]
[Java] [javaSliver] L'interface et les classes abstraites sont compliquées, je vais donc les résumer.
DrainTo de LinkedBlockingQueue est-il sûr? J'ai suivi la source
La comparaison d'énumération est ==, et equals est bonne [Java]
Quel est le meilleur, Kotlin ou futur Java?
Organiser l'état actuel de Java et envisager l'avenir
J'ai résumé les types et les bases des exceptions Java
Je veux savoir quelle version de java le fichier jar que j'ai est disponible
J'ai fini de regarder les roses de Versailles, alors j'ai essayé de reproduire la chanson de fin en Java
J'ai remarqué que je développe un FW Java dans le référentiel mono, donc je vais considérer l'opération.