Dans le monde Java, vous pouvez facilement créer des threads en utilisant la classe Thread. Cependant, il est difficile de gérer correctement le cycle de vie, donc Executor etc. Il est recommandé d'utiliser.
Le point 68 de Effective Java dit "Choisissez un exécuteur et une tâche parmi les threads".
À propos, Executor (En fait [ExecutorService](https: // docs. oracle.com/javase/jp/8/docs/api/java/util/concurrent/ExecutorService.html) et ScheduledExecutorService Si vous avez besoin d'une instance de (je pense que vous utiliserez l'interface à /util/concurrent/ScheduledExecutorService.html), alors Executors Il existe une méthode de fabrique dans /java/util/concurrent/Executors.html), donc c'est facile à utiliser via ceci, et dans la plupart des cas, le simple fait de l'utiliser résoudra le problème.
Ici, nous allons vous expliquer comment créer un «service exécuteur qui augmente ou diminue le nombre de threads en fonction de l'état de la tâche et définit une limite supérieure sur le nombre de threads», qui n'est pas fourni par les exécuteurs. Je vais l'expliquer d'une manière qui compare mes essais et mes erreurs, alors que dois-je faire après tout? Si vous en avez juste besoin, faites défiler jusqu'au dernier chapitre.
Outre ScheduledExecutorService, il existe les méthodes de fabrique suivantes pour créer une instance d'ExecutorService.
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
Jetons un œil à l'implémentation de ces méthodes d'usine.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Tout d'abord, je suis curieux que SingleThreadExecutor soit enveloppé dans FinalizableDelegatedExecutorService, mais si vous regardez l'implémentation
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
Cela ressemble à une classe wrap qui appelle juste shutdown ()
avec finalize ()
.
Mais pourquoi seulement SingleThreadExecutor? Ce n'est pas le sujet principal, alors je vais le laisser.
Vous pouvez voir que les deux exécuteurs utilisent la classe ThreadPoolExecutor. Il semble que le comportement change en fonction du paramètre.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Enfin, si vous souhaitez effectuer de nombreuses tâches qui provoquent essentiellement une attente d'E / S, etc., je pense que vous utiliserez CachedThreadPool. Cependant, en regardant la définition que j'ai écrite plus tôt, le nombre maximum de threads est ʻInteger.MAX_VALUE`. Dans le cas d'un thread en attente d'E / S, il est logique de démarrer un thread séparé car le processeur est libre. Cependant, bien sûr, le coût de la mémoire, etc. augmentera en fonction du nombre de threads, vous pouvez donc éviter de démarrer trop de threads.
Si tel est le cas, s'agit-il d'un pool de threads fixes? Cependant, je veux un certain parallélisme maximal aux heures de pointe, mais quand je n'en ai pas besoin, je veux que vous réduisiez le nombre de threads ... Je pense qu'une telle demande est normale. C'est pourquoi je veux la version de réglage de limite de thread de CachedThreadPool.
Puis-je changer un peu l'argument?
Parce que CachedThreadPool est comme ça ...
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Si vous modifiez ʻInteger.MAX_VALUE`, oui
new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
Puisque les 3ème et 4ème arguments sont le temps de cache du thread, changeons-le ici et testons-le.
@Test
public void test() throws Exception {
final int poolSize = 4;
final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>());
final CountDownLatch endLatch = new CountDownLatch(poolSize);
for (int i = 0; i < poolSize; i++) {
final CountDownLatch startLatch = new CountDownLatch(1);
executor.execute(() -> {
startLatch.countDown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
endLatch.countDown();
});
startLatch.await();
assertThat(executor.getActiveCount(), is(i + 1));
assertThat(executor.getPoolSize(), is(i + 1));
}
endLatch.await();
Thread.sleep(100);
assertThat(executor.getActiveCount(), is(0));
assertThat(executor.getPoolSize(), is(poolSize));
Thread.sleep(1000);
assertThat(executor.getActiveCount(), is(0));
assertThat(executor.getPoolSize(), is(0));
}
Le nombre de threads augmente en fonction de la quantité de charge, et lorsque la tâche est terminée, les threads inactifs restent et lorsque le temps de cache est dépassé, les threads regroupés se terminent également. Ouais ouais bien
Ensuite, assurons-nous que le degré de parallélisme est dans la limite supérieure.
@Test
public void test2() throws Exception {
final int poolSize = 4;
final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>());
final CountDownLatch endLatch = new CountDownLatch(poolSize);
final CountDownLatch startLatch = new CountDownLatch(poolSize);
for (int i = 0; i < poolSize + 1; i++) {
executor.execute(() -> {
startLatch.countDown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
startLatch.await();
assertThat(executor.getActiveCount(), is(poolSize));
assertThat(executor.getPoolSize(), is(poolSize));
}
java.util.concurrent.RejectedExecutionException: Task example.ThreadTest$$Lambda$1/739498517@311d617d rejected from java.util.concurrent.ThreadPoolExecutor@7c53a9eb[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
……quoi……
En regardant à nouveau FixedThreadPool, le dernier argument est différent.
CachedThreadPool a spécifié SynchronousQueue. Je l'ai utilisé sans le savoir, mais je ne sais pas ce que signifie SynchronousQueue Je l'ai utilisé. Selon l'explication, c'était une file d'attente dont l'offre ne réussit que lorsqu'un thread essayait de lire avec une file d'attente bloquante de capacité nulle.
Eh bien, comme il n'y a pas de limite supérieure sur le nombre de threads, il n'y a pas besoin de capacité dans la file d'attente en premier lieu.
En d'autres termes, si vous utilisez la file d'attente de blocage liée utilisée dans Fixed Thread Pool ...
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
}
java.lang.AssertionError:
Expected: is <2>
but: was <1>
Expected :is <2>
Actual :<1>
orz Cette fois, le nombre de threads n'augmentera pas
ThreadPoolExecutor Javadoc a une explication appropriée.
** Noyau et taille maximale de la piscine ** ThreadPoolExecutor ajuste automatiquement la taille du pool (voir getPoolSize ()) en fonction des limites définies par corePoolSize (voir getCorePoolSize ()) et maximumPoolSize (voir getMaximumPoolSize ()). Si une nouvelle tâche est envoyée avec la méthode execute (Runnable) et qu'il y a moins de threads en cours d'exécution que corePoolSize, un nouveau thread sera créé pour gérer la demande, même si d'autres threads de travail sont inactifs. S'il y a plus de threads que corePoolSize et moins que maximumPoolSize en cours d'exécution, de nouveaux threads ne seront créés que si la file d'attente est pleine. La définition de corePoolSize et maximumPoolSize sur la même valeur crée un pool de threads de taille fixe. Vous pouvez stocker n'importe quel nombre de tâches simultanées dans le pool en définissant maximumPoolSize sur une valeur qui est dans un format virtuellement indépendant, tel que Integer.MAX_VALUE. La taille du pool principal et la taille maximale du pool sont le plus souvent définies uniquement au moment de la construction, mais peuvent également être modifiées dynamiquement à l'aide de setCorePoolSize (int) et setMaximumPoolSize (int).
Voyons la mise en œuvre
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Pour l'organiser facilement
En d'autres termes, si vous souhaitez spécifier corePoolSize sur 0 et maximumPoolSize comme limite supérieure valide,
Il se trouve que.
L'exécuteur que vous voulez créer
Si vous utilisez ThreadPoolExecutor, les caractéristiques de file d'attente requises sont
--Offre réussit s'il y a un thread essayant de lire
Autrement dit, il s'agit d'une file d'attente dont le comportement change en fonction de l'état de ThreadPoolExecutor. Quand il s'agit de le faire, on a l'impression qu'il est étroitement couplé, avec Executor et Queue modifiés puis référencés l'un à l'autre.
Cependant, il existe un constructeur ThreadPoolExecutor qui vous permet de spécifier RejectedExecutionHandler
comme autre argument.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
Comme son nom l'indique, RejectedExecutionHandler
est un gestionnaire qui appelle void failedExecution (Runnable r, ThreadPoolExecutor executor);
lors du rejet.
Par défaut, un gestionnaire qui lève simplement une exception est défini.
Cela semble être disponible.
Autrement dit, implémentez les interfaces BlockingQueue <Runnable>
et RejectedExecutionHandler
et
--Offre réussit s'il y a un thread essayant de lire
--Offre échoue s'il n'y a pas de thread essayant de lire
--Lorsque failedExecution
est appelé, empilez-le dans la file d'attente
Il vous suffit de le faire fonctionner.
La classe créée sur la base de la considération jusqu'à présent est la suivante. Je l'ai écrit en Java, mais j'avais besoin d'un gâchis de code de chaudière dans le délégué, alors je l'ai réécrit dans Kotlin afin que je puisse le coller ici.
class ThreadWorkQueue(
private val delegate: BlockingQueue<Runnable> = LinkedBlockingQueue()
) : BlockingQueue<Runnable> by delegate,
RejectedExecutionHandler {
private val idleThreads = AtomicInteger(0)
override fun offer(runnable: Runnable): Boolean {
return if (idleThreads.get() == 0) {
false
} else delegate.offer(runnable)
}
override fun take(): Runnable {
idleThreads.incrementAndGet()
try {
return delegate.take()
} finally {
idleThreads.decrementAndGet()
}
}
override fun poll(timeout: Long, unit: TimeUnit): Runnable? {
idleThreads.incrementAndGet()
try {
return delegate.poll(timeout, unit)
} finally {
idleThreads.decrementAndGet()
}
}
override fun rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
if (executor.isShutdown) {
throw RejectedExecutionException("Task $r rejected from $executor")
}
delegate.offer(r)
}
}
En passant une instance de cette classe à ThreadPoolExecutor
en tant que workQueue et gestionnaire, nous avons pu créer un ExecutorService avec le comportement souhaité.
La méthode d'usine finale est comme ça. Le nombre maximum de threads est le nombre de processeurs disponibles, mais le minimum est de 2 threads.
Je pense qu'il existe différents paramètres ici en fonction des caractéristiques de la tâche que vous souhaitez exécuter.
private static ExecutorService createParallelExecutor() {
final ThreadWorkQueue queue = new ThreadWorkQueue();
return new ThreadPoolExecutor(0, calculateMaximumPoolSize(),
1L, TimeUnit.MINUTES, queue, queue);
}
private static int calculateMaximumPoolSize() {
return Math.max(2, Runtime.getRuntime().availableProcessors());
}
Ce qui précède est de savoir comment créer "Executor Service où le nombre de threads augmente ou diminue en fonction de l'état de la tâche et la limite supérieure est définie pour le nombre de threads".
Pour être honnête, j'ai pu créer un ExecutorService qui se comporte comme souhaité, mais je n'ai pas été en mesure d'évaluer son efficacité par rapport à la méthode d'usine existante ʻExecutors. La recommandation d'utiliser Executor est que la gestion des threads difficiles est minutieusement testée et que les antécédents sont laissés à la bibliothèque existante, donc si vous jouez mal avec elle, les avantages disparaîtront. À moins que cela ne soit absolument nécessaire, je pense qu'il vaut mieux utiliser la méthode de fabrique de ʻExecutors
.
Recommended Posts