[JAVA] Commençons par la programmation parallèle

Cet article est le 12ème jour du Calendrier de l'Avent Java 2016.

La veille, c'était la base de données orientée objet (JPA) de leak4mk0, même avec Java SE. Vient ensuite la base de données orientée objet (JPA) de tkxlab même en Java SE.

introduction

Cela fait plusieurs années que nous sommes entrés dans l'ère du multi-core, qui serait la fin du déjeuner gratuit. La raison en est que je crée souvent des applications Web, mais cette année, la vague de programmation parallèle est venue à moi, qui développait avec presque des threads uniques (bien qu'il y ait des cas où je connais des multi-threads tels que serve red). .. C'est pourquoi le calendrier de l'Avent de cette année résume une introduction à la programmation parallèle.

API parallèle standard Java 8

Java a été vanté pour la programmation multithread depuis le début, et c'est un langage qui facilite la programmation parallèle. Il a été régulièrement étendu avec la mise à niveau de la version et dispose actuellement des API suivantes.

Voyons comment l'utiliser un par un.

Classe de fil

"Oublie moi"

C'est une demi-blague, mais dans la plupart des cas, vous ne devriez pas utiliser directement la classe Thread ordinaire.

J'ai également commencé par utiliser le fil simple basé sur d'anciennes connaissances, et lorsque je cherche sur Google avec le multithreading, il atteint toujours le sommet, Il est beaucoup plus facile d'utiliser Future et Parallel Stream, et ce que vous pouvez faire avec Thread est généralement plus intelligent avec Executor.

Executor framework

Une API de traitement parallèle simple basée sur un pool de threads. Le processus est décomposé en une granularité appelée "Tâche", et il est affecté au pool de threads et exécuté. Puisque Task est exprimé par une classe ou une expression lambda qui hérite de l'interface Runnable, il peut être utilisé efficacement comme "compatibilité ascendante de Thread qui prend en charge les pools de threads".

Aussi, généralement, nous utilisons un Executor (le nom est déroutant!) Appelé Executor Service qui peut gérer Future.

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(2);
    try {
        es.execute(() -> System.out.println("executor:1, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:2, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:3, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:4, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:5, thread-id:" + Thread.currentThread().getId()));
    } finally {
        es.shutdown();
        es.awaitTermination(1, TimeUnit.MINUTES);
    }
}

Le résultat de l'exécution est le suivant.

executor:1, thread-id:11
executor:2, thread-id:11
executor:3, thread-id:12
executor:4, thread-id:12
executor:5, thread-id:12

Vous pouvez voir que le thread spécifié dans newFixedThreadPool est réutilisé même si execute est exécuté 5 fois. Executors est une fabrique qui crée ExecutorService et dispose d'une méthode pour créer un pool de threads avec la stratégie suivante.

Nom de la méthode Aperçu
newCachedThreadPool Un pool de threads qui crée de nouveaux threads si nécessaire, mais réutilise les threads précédemment construits lorsqu'ils sont disponibles
newFixedThreadPool Pool de threads qui réutilise un nombre fixe spécifié de threads
newWorkStealingPool Un pool de threads qui contient le nombre maximal de cœurs de processeur ou le nombre spécifié de parallèles. Une file d'attente de tâches est affectée à chaque thread et lorsque la file d'attente devient libre, les tâches sont interceptées à partir d'autres threads.(Work Stealing)Procéder

newWorkStealingPool est une nouvelle méthode apparue dans Java 8. Il allouera les threads efficacement, donc s'il n'y a pas de problème, il sera adopté.

Fork/Join

Effectuez une programmation parallèle à l'aide de Fork et Join, comme la gestion des processus Unix.

Il s'agit d'une API conçue pour effectuer des traitements lourds tels que la gouvernance fractionnée par récursif. Comme il utilise ForkJoinPool, qui utilise l'algorithme de vol de travail, il est conçu pour effectuer efficacement des tâches plus petites que Executor.

Cependant, contrairement à l'apparition de Java 7, Java 8 a également un nouveau WorkStealing Pool dans Executor, donc je pense que l'efficacité de l'exécution ne changera pas beaucoup. (※ non confirmé)

Je suis doué pour accélérer la récurrence, alors faisons un échantillon pour trouver la séquence familière de Fibonacci.

Tout d'abord, une séquence de nombres de Fibonacci créée par récurrence normale pour comparaison.

public static int fib(int n) {
    if (n == 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        return fib(n - 1) + fib(n - 2);
    }
}

public static void main(String[] args) {
    System.out.println(fib(45));
}

Si cela est implémenté avec ForkJoin, ce sera comme suit.

static class FibonacciTask extends RecursiveTask<Integer> {

    private final int n;

    FibonacciTask(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n == 0) {
            return 0;
        } else if (n == 1) {
            return 1;
        } else {
            FibonacciTask f1 = new FibonacciTask(n - 1);
            FibonacciTask f2 = new FibonacciTask(n - 2);

            f2.fork();
            return f1.compute() + f2.join();
        }
    }
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    System.out.println(pool.invoke(new FibonacciTask(45)));
}

J'écris un processus en créant une FibonacciTask qui hérite de la RecursiveTask.

A l'origine, l'acquisition des valeurs de f1: fib (n-1) et f2: fib (f-2), qui étaient à l'origine obtenues en utilisant récursif, est réalisée par récursive pour f1 et fork / join pour f2. C'est le point. Chaque fois que vous forkez, une nouvelle tâche asynchrone est créée, de sorte que le degré de parallélisme augmente. Puisque f1 est un calcul récursif, le traitement est exécuté de manière régulière sans attendre le résultat de f2. En conséquence, un grand nombre de tâches asynchrones peuvent être effectuées et un calcul parallèle peut être effectué.

La méthode invoke est une méthode qui «renvoie la valeur d'attente de manière synchrone jusqu'à ce que le traitement soit terminé». C'est également une méthode wrapper pour Task.fork.join.

Cependant, peut-être ai-je fait une erreur en écrivant quelque chose ou c'était un problème de surcharge, et dans le code que j'ai écrit cette fois, le code simple exécuté dans un seul thread est presque deux fois plus rapide que fork / join. .. .. Ainsi, lorsque vous l'utilisez réellement, vous devez mesurer s'il sera plus rapide. Cela dépendra du cas d'utilisation.

CompletableFuture

CompletableFuture est une API pour réaliser Future / promise, qui est l'un des modèles de conception de programmation parallèle introduits à partir de Java8. Ma compréhension est qu'il s'agit d'un modèle de conception pour traiter la programmation asynchrone comme la programmation synchrone, et dans le cas de Java 8, l'asynchrone est garantie par les threads.

Je vais l'écrire comme ça.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService es = Executors.newWorkStealingPool();

    //Exécution asynchrone simple
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
    System.out.println(future.get());

    //La synthèse est également possible
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
    CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
    System.out.println(f2.get());

    //Vous pouvez le regrouper dans une liste ou tout traiter en même temps.
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        final int n = i;
        futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
    }
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
}

Dans cet exemple, ExecutorService est passé comme argument. En faisant cela, même si vous contrôlez des threads au milieu comme JavaEE, si vous avez un ExecutorService qui le prend en charge, vous pouvez l'utiliser sans conflit. Il semble que ForkJoinPool.commonPool () soit utilisé par défaut.

Tout d'abord, il s'agit d'une exécution asynchrone simple, mais c'est une forme d'exécution avec une expression lambda comme argument avec supplyAsync ou runAsync. En exécutant get, il attend la fin de Future et reçoit la valeur de retour.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
System.out.println(future.get());

La chose pratique à propos de CompletableFuture est que vous pouvez gérer le processus asynchrone lui-même comme une variable sous la forme de Future, de sorte que vous pouvez simplement écrire un processus qui prend le résultat d'un certain processus asynchrone comme argument. Par exemple, si vous récupérez une page Web, la traitez et l'enregistrez dans la base de données, vous passez généralement une fonction de rappel comme argument. Cela conduit facilement à l'enfer des rappels.

Dans le cas de CompletableFuture, vous pouvez utiliser thenApply etc. pour exprimer le post-traitement d'un tel traitement asynchrone sans utiliser le rappel comme indiqué ci-dessous.

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());

Ce type de traitement s'appelle "synthèse", donc si vous n'y êtes pas habitué, vous vous sentirez probablement comme "qu'est-ce que la composition?", Mais j'espère que vous comprendrez que ce n'est pas si grave.

De plus, comme CompletableFuture n'est qu'une valeur, elle peut être compressée dans une liste, ou elle peut être exécutée simultanément en utilisant un tableau (argument de longueur variable en termes de méthode) en utilisant allOf.

List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    final int n = i;
    futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));

Il existe de nombreuses autres fonctions de CompletableFuture, mais je ne peux pas les écrire un peu, donc je vais les omettre ici.

ParallelStream

En parlant de parallélisme introduit dans Java8, beaucoup de gens y pensent. ParallelStream pour les opérations de collecte parallèles.

Les collections parallèles ont longtemps été utilisées dans les langages déclaratifs et sans côtés tels que les langages fonctionnels, mais avec Java 8, ils sont enfin introduits à Java. C'est un goût très différent qu'avant, et c'est une méthode qui gère automatiquement le parallélisme côté système au lieu de traiter directement le parallélisme et l'asynchrone.

Contrairement aux opérations parallèles explicites, cela limite ce que vous pouvez réaliser, mais cela rend votre code beaucoup plus simple. Il peut également couvrir de nombreux autres cas d'utilisation pour les exigences de haute vitesse en raison de la parallélisation.

Regardons d'abord le code.

public static void main(String[] args){
    IntStream.range(1, 10).boxed()
            .parallel()
            .map(x -> "task" + x)
            .forEach(System.out::println);
}

C'est presque impossible à distinguer d'un Stream normal, non? La différence est que .parallel est ajouté. Le traitement parallèle peut être assemblé avec juste cela, et comme il s'agit d'un Stream, il est naturel d'utiliser la valeur de retour du traitement précédent dans le traitement suivant, donc une histoire comme "Synthèse future" est simple.

C'est une API très puissante et concise, mais il y a quelques mises en garde. N'oubliez pas qu'il s'agit de programmation parallèle.

Notez que si vous utilisez une variable ou une ressource externe autre qu'un environnement qui prend en charge le parallélisme tel que l'exclusion, elle sera facilement cassée.

Bibliothèque non standard

Outre la bibliothèque Java standard, il existe d'autres bibliothèques qui prennent en charge le parallélisme. La plupart du temps, je ne l'ai pas encore essayé, mais je vais simplement le mentionner.

Apache Spark

http://spark.apache.org/

Il s'agit d'une plateforme de traitement distribuée pour l'exécution de lots à grande échelle. En ce qui concerne le modèle de programmation, la collecte parallèle est adoptée comme dans Parallel Stream.

Je l'ai utilisé différemment des autres bibliothèques, veuillez donc vous référer à l'article écrit ci-dessous.

Akka

http://doc.akka.io/docs/akka/snapshot/intro/getting-started.html

C'est une bibliothèque / middleware pour réaliser le modèle d'acteur. Il est dérivé de Scala, mais il peut également être utilisé en Java. Bien qu'il s'agisse d'une bibliothèque de programmation parallèle pour gérer des acteurs plus légers que les threads, il s'agit d'une reconnaissance qui est plus couramment utilisée comme infrastructure de système distribué.

JCSP

https://www.cs.kent.ac.uk/projects/ofa/jcsp/

Cela semble être une bibliothèque pour faire de l'algèbre de processus en Java. L'algèbre de processus est une méthode mathématique pour les calculs parallèles, j'aimerais donc l'essayer la prochaine fois.

Résumé

Eh bien, c'est facile, mais j'ai résumé la programmation parallèle en Java. Je pense que beaucoup de gens ont une image forte du multithreading simple, mais Java a beaucoup changé.

Fondamentalement, Parallel Stream, et dans les cas difficiles, Completable Future est susceptible de devenir le courant dominant à l'avenir. Je pense qu'il est plus facile d'écrire du code sécurisé car ils n'utilisent pas nécessairement des variables communes pour passer des valeurs entre les threads. Je ne l'ai pas présenté cette fois, mais il y a des cas où vous pouvez utiliser des collections non bloquantes même si vous avez absolument besoin de variables communes, donc je pense que vous les utiliserez au besoin.

De plus, récemment, la parallélisation à grande échelle par des systèmes distribués est devenue familière grâce au cloud, au-delà de la programmation parallèle sur une seule machine. Je veux aussi étudier dur ici.

Alors l'année prochaine sera Happy Hacking!

référence

Recommended Posts

Commençons par la programmation parallèle
Commencez avec Gradle
Commençons par Java - Créez un environnement de développement ②
Commençons par Java - Créez un environnement de développement ①
Peut-être que ça marche! Commençons avec Docker!
Démarrez avec Spring Boot
Présentation de «Introduction à la programmation pratique de la rouille» (jour 3)
Comment démarrer avec Slim
J'ai essayé de démarrer avec Web Assembly
[Note] Comment démarrer avec Rspec
Présentation de «Introduction à la programmation pratique de Rust» (jour 4) Appel de Rust depuis Ruby
Comment démarrer avec Eclipse Micro Profile
Les débutants de Rails ont essayé de se lancer avec RSpec
J'ai commencé un journal de programmation
Les utilisateurs Java expérimentés se lancent dans le développement d'applications Android
[Mon mémo] Entendons-nous bien avec Pry / DB avec Rails
Premiers pas avec DBUnit
J'ai essayé de démarrer avec Spring Data JPA
Premiers pas avec Ruby
Premiers pas avec Swift
Raclons avec Java! !!
Premiers pas avec Doma-Transactions
Commencez avec Java sans serveur avec le framework léger Micronaut!
Un développeur Java de première année chez udemy a essayé de se lancer avec PHP
Il est maintenant temps de commencer avec l'API Stream
Comment démarrer avec JDBC en utilisant PostgresSQL sur MacOS
J'ai essayé de démarrer avec Swagger en utilisant Spring Boot
Premiers pas avec le traitement Doma-Annotation
Premiers pas avec Java Collection
Expérimentons l'expansion en ligne Java
Programmation Java incroyable (arrêtons-nous)
[Form_with] Unifions le formulaire avec form_with.
Exploitons Excel avec Java! !!
Premiers pas avec JSP et servlet
Premiers pas avec les bases de Java
Premiers pas avec Spring Boot
Premiers pas avec les modules Ruby
Revue de code facile pour démarrer avec Jenkins / SonarQube: analyse statique