[JAVA] Je souhaite créer un ExecutorService qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads.

Dans le monde Java, vous pouvez facilement créer des threads en utilisant la classe Thread. Cependant, il est difficile de gérer correctement le cycle de vie, donc Executor etc. Il est recommandé d'utiliser.

Le point 68 de Effective Java dit "Choisissez un exécuteur et une tâche parmi les threads".

À propos, Executor (En fait [ExecutorService](https: // docs. oracle.com/javase/jp/8/docs/api/java/util/concurrent/ExecutorService.html) et ScheduledExecutorService Si vous avez besoin d'une instance de (je pense que vous utiliserez l'interface à /util/concurrent/ScheduledExecutorService.html), alors Executors Il existe une méthode de fabrique dans /java/util/concurrent/Executors.html), donc c'est facile à utiliser via ceci, et dans la plupart des cas, le simple fait de l'utiliser résoudra le problème.

Ici, nous allons vous expliquer comment créer un «service exécuteur qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads», qui n'est pas fourni par les exécuteurs. Je vais l'expliquer d'une manière qui compare mes essais et mes erreurs, alors que dois-je faire après tout? Si vous en avez juste besoin, faites défiler jusqu'au dernier chapitre.

Méthode de fabrique ExecutorService

Outre ScheduledExecutorService, il existe les méthodes de fabrique suivantes pour créer une instance d'ExecutorService.

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
SingleThreadExecutor
Effectuer des tâches dans un seul thread. Utilisé lorsque vous souhaitez exécuter des tâches de manière séquentielle
FixedThreadPool
Exécutez la tâche avec le nombre fixe spécifié de threads. Utilisé pour la parallélisation du traitement arithmétique.
CachedThreadPool
Les threads sont créés selon les besoins, les threads créés sont mis en cache et réutilisés pendant un certain temps, et les threads qui n'ont pas été utilisés depuis un certain temps sont arrêtés. Utilisé pour le traitement IO qui change avec le temps et a un long temps d'attente

Mise en œuvre des méthodes d'usine

Jetons un œil à l'implémentation de ces méthodes d'usine.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Tout d'abord, je suis curieux que SingleThreadExecutor soit enveloppé dans FinalizableDelegatedExecutorService, mais si vous regardez l'implémentation

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

Cela ressemble à une classe wrap qui appelle juste shutdown () avec finalize (). Mais pourquoi seulement SingleThreadExecutor? Ce n'est pas le sujet principal, alors je vais le laisser.

Vous pouvez voir que les deux exécuteurs utilisent la classe ThreadPoolExecutor. Il semble que le comportement change en fonction du paramètre.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

Je souhaite créer un ExecutorService qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads.

Enfin, si vous souhaitez effectuer de nombreuses tâches qui provoquent essentiellement une attente d'E / S, etc., je pense que vous utiliserez CachedThreadPool. Cependant, en regardant la définition que j'ai écrite plus tôt, le nombre maximum de threads est ʻInteger.MAX_VALUE`. Dans le cas d'un thread en attente d'E / S, il est logique de démarrer un thread séparé car le processeur est libre. Cependant, bien sûr, le coût de la mémoire, etc. augmentera en fonction du nombre de threads, vous pouvez donc éviter de démarrer trop de threads.

Si tel est le cas, s'agit-il d'un pool de threads fixes? Cependant, je veux un certain parallélisme maximal aux heures de pointe, mais quand je n'en ai pas besoin, je veux que vous réduisiez le nombre de threads ... Je pense qu'une telle demande est normale. C'est pourquoi je veux la version de réglage de limite de thread de CachedThreadPool.

Essayez de changer l'argument de ThreadPoolExecutor

Puis-je changer un peu l'argument?

Parce que CachedThreadPool est comme ça ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Si vous modifiez ʻInteger.MAX_VALUE`, oui

new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

Puisque les 3ème et 4ème arguments sont le temps de cache du thread, changeons-le ici et testons-le.

tester

@Test
public void test() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize; i++) {
        final CountDownLatch startLatch = new CountDownLatch(1);
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            endLatch.countDown();
        });
        startLatch.await();
        assertThat(executor.getActiveCount(), is(i + 1));
        assertThat(executor.getPoolSize(), is(i + 1));
    }
    endLatch.await();
    Thread.sleep(100);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(poolSize));
    Thread.sleep(1000);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(0));
}

Le nombre de threads augmente en fonction de la quantité de charge, et lorsque la tâche est terminée, les threads inactifs restent et lorsque le temps de cache est dépassé, les threads regroupés se terminent également. Ouais ouais bien

Ensuite, assurons-nous que le degré de parallélisme est dans la limite supérieure.

@Test
public void test2() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    final CountDownLatch startLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize + 1; i++) {
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    startLatch.await();
    assertThat(executor.getActiveCount(), is(poolSize));
    assertThat(executor.getPoolSize(), is(poolSize));
}
java.util.concurrent.RejectedExecutionException: Task example.ThreadTest$$Lambda$1/739498517@311d617d rejected from java.util.concurrent.ThreadPoolExecutor@7c53a9eb[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]

……quoi……

Repensons

En regardant à nouveau FixedThreadPool, le dernier argument est différent.

CachedThreadPool a spécifié SynchronousQueue. Je l'ai utilisé sans le savoir, mais je ne sais pas ce que signifie SynchronousQueue Je l'ai utilisé. Selon l'explication, c'était une file d'attente dont l'offre ne réussit que lorsqu'un thread essayait de lire avec une file d'attente bloquante de capacité nulle.

Eh bien, comme il n'y a pas de limite supérieure sur le nombre de threads, il n'y a pas besoin de capacité dans la file d'attente en premier lieu.

En d'autres termes, si vous utilisez la file d'attente de blocage liée utilisée dans Fixed Thread Pool ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
java.lang.AssertionError: 
Expected: is <2>
     but: was <1>
Expected :is <2>
     
Actual   :<1>

orz Cette fois, le nombre de threads n'augmentera pas

N'essayez pas de l'utiliser uniquement par intuition

ThreadPoolExecutor Javadoc a une explication appropriée.

** Noyau et taille maximale de la piscine ** ThreadPoolExecutor ajuste automatiquement la taille du pool (voir getPoolSize ()) en fonction des limites définies par corePoolSize (voir getCorePoolSize ()) et maximumPoolSize (voir getMaximumPoolSize ()). Si une nouvelle tâche est envoyée avec la méthode execute (Runnable) et qu'il y a moins de threads en cours d'exécution que corePoolSize, un nouveau thread sera créé pour gérer la demande, même si d'autres threads de travail sont inactifs. S'il y a plus de threads que corePoolSize et moins que maximumPoolSize en cours d'exécution, de nouveaux threads ne seront créés que si la file d'attente est pleine. La définition de corePoolSize et maximumPoolSize sur la même valeur crée un pool de threads de taille fixe. Vous pouvez stocker n'importe quel nombre de tâches simultanées dans le pool en définissant maximumPoolSize sur une valeur qui est dans un format virtuellement indépendant, tel que Integer.MAX_VALUE. La taille du pool principal et la taille maximale du pool sont le plus souvent définies uniquement au moment de la construction, mais peuvent également être modifiées dynamiquement à l'aide de setCorePoolSize (int) et setMaximumPoolSize (int).

Voyons la mise en œuvre

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Relation entre la file d'attente ThreadPool Executor et le pool de threads

Pour l'organiser facilement

En d'autres termes, si vous souhaitez spécifier corePoolSize sur 0 et maximumPoolSize comme limite supérieure valide,

Il se trouve que.

Quelle est la disposition des valeurs attendues et de la file d'attente correspondante?

L'exécuteur que vous voulez créer

Si vous utilisez ThreadPoolExecutor, les caractéristiques de file d'attente requises sont

--Offre réussit s'il y a un thread essayant de lire

Autrement dit, il s'agit d'une file d'attente dont le comportement change en fonction de l'état de ThreadPoolExecutor. Quand il s'agit de le faire, on a l'impression qu'il est étroitement couplé, avec Executor et Queue modifiés puis référencés l'un à l'autre.

Cependant, il existe un constructeur ThreadPoolExecutor qui vous permet de spécifier RejectedExecutionHandler comme autre argument.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

Comme son nom l'indique, RejectedExecutionHandler est un gestionnaire qui appelle void failedExecution (Runnable r, ThreadPoolExecutor executor); lors du rejet. Par défaut, un gestionnaire qui lève simplement une exception est défini. Cela semble être disponible. Autrement dit, implémentez les interfaces BlockingQueue <Runnable> et RejectedExecutionHandler et

--Offre réussit s'il y a un thread essayant de lire --Offre échoue s'il n'y a pas de thread essayant de lire --Lorsque failedExecution est appelé, empilez-le dans la file d'attente

Il vous suffit de le faire fonctionner.

ExecutorService qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads

La classe créée sur la base de la considération jusqu'à présent est la suivante. Je l'ai écrit en Java, mais j'avais besoin d'un gâchis de code de chaudière dans le délégué, alors je l'ai réécrit dans Kotlin afin que je puisse le coller ici.

class ThreadWorkQueue(
        private val delegate: BlockingQueue<Runnable> = LinkedBlockingQueue()
) : BlockingQueue<Runnable> by delegate,
        RejectedExecutionHandler {
    private val idleThreads = AtomicInteger(0)

    override fun offer(runnable: Runnable): Boolean {
        return if (idleThreads.get() == 0) {
            false
        } else delegate.offer(runnable)
    }

    override fun take(): Runnable {
        idleThreads.incrementAndGet()
        try {
            return delegate.take()
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun poll(timeout: Long, unit: TimeUnit): Runnable? {
        idleThreads.incrementAndGet()
        try {
            return delegate.poll(timeout, unit)
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
        if (executor.isShutdown) {
            throw RejectedExecutionException("Task $r rejected from $executor")
        }
        delegate.offer(r)
    }
}

En passant une instance de cette classe à ThreadPoolExecutor en tant que workQueue et gestionnaire, nous avons pu créer un ExecutorService avec le comportement souhaité. La méthode d'usine finale est comme ça. Le nombre maximum de threads est le nombre de processeurs disponibles, mais le minimum est de 2 threads. Je pense qu'il existe différents paramètres ici en fonction des caractéristiques de la tâche que vous souhaitez exécuter.

private static ExecutorService createParallelExecutor() {
    final ThreadWorkQueue queue = new ThreadWorkQueue();
    return new ThreadPoolExecutor(0, calculateMaximumPoolSize(),
            1L, TimeUnit.MINUTES, queue, queue);
}

private static int calculateMaximumPoolSize() {
    return Math.max(2, Runtime.getRuntime().availableProcessors());
}

Ce qui précède est de savoir comment créer "Executor Service où le nombre de threads augmente ou diminue en fonction de l'état de la tâche et la limite supérieure est définie pour le nombre de threads".

Pour être honnête, j'ai pu créer un ExecutorService qui se comporte comme souhaité, mais je n'ai pas été en mesure d'évaluer son efficacité par rapport à la méthode d'usine existante ʻExecutors. La recommandation d'utiliser Executor est que la gestion des threads difficiles est minutieusement testée et que les antécédents sont laissés à la bibliothèque existante, donc si vous jouez mal avec elle, les avantages disparaîtront. À moins que cela ne soit absolument nécessaire, je pense qu'il vaut mieux utiliser la méthode de fabrique de ʻExecutors.

Recommended Posts

Je souhaite créer un ExecutorService qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads.
Commande pour vérifier le nombre et l'état des threads Java
Je veux appeler une méthode et compter le nombre
L'histoire de Collectors.groupingBy que je veux garder pour la postérité
Je veux limiter l'entrée en réduisant la plage de nombres
Je veux ajouter l'option désactivée à f.radio_button en fonction de la condition
[Administrateur actif] Je souhaite personnaliser le traitement de création et de mise à jour par défaut
[Ruby] Je souhaite extraire uniquement la valeur du hachage et uniquement la clé
Je veux passer l'argument d'Annotation et l'argument de la méthode d'appel à aspect
Je veux voir le contenu de Request sans dire quatre ou cinq
Je veux obtenir récursivement la superclasse et l'interface d'une certaine classe