[JAVA] Ich möchte einen ExecutorService erstellen, der die Anzahl der Threads abhängig vom Aufgabenstatus erhöht oder verringert und eine Obergrenze für die Anzahl der Threads festlegt.

In der Java-Welt können Sie mithilfe der Thread-Klasse problemlos Threads erstellen. Es ist jedoch schwierig, den Lebenszyklus ordnungsgemäß zu verwalten, daher Executor usw. Wird zur Verwendung empfohlen.

In Punkt 68 von Effective Java heißt es: "Wählen Sie einen Executor und eine Aufgabe aus den Threads aus."

Übrigens Executor (Eigentlich [ExecutorService](https: // docs. oracle.com/javase/jp/8/docs/api/java/util/concurrent/ExecutorService.html) und ScheduledExecutorService Wenn Sie eine Instanz von benötigen (ich denke, Sie verwenden die Schnittstelle unter /util/concurrent/ScheduledExecutorService.html), dann Executors Es gibt eine Factory-Methode in /java/util/concurrent/Executors.html), daher ist es einfach, sie über diese Methode zu verwenden, und für die meisten Zwecke wird das Problem durch die Verwendung dieser Methode gelöst.

Hier wird erläutert, wie Sie einen "Executor Service" erstellen, der die Anzahl der Threads abhängig vom Aufgabenstatus erhöht oder verringert und eine Obergrenze für die Anzahl der Threads festlegt, der nicht von Executors bereitgestellt wird. Ich werde es auf eine Weise erklären, die meinen Versuch und Irrtum vergleicht. Was soll ich also tun? Wenn Sie es nur brauchen, scrollen Sie zum letzten Kapitel.

ExecutorService-Factory-Methode

Neben ScheduledExecutorService gibt es die folgenden Factory-Methoden zum Erstellen einer Instanz von ExecutorService.

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
SingleThreadExecutor
Führen Sie Aufgaben in einem einzelnen Thread aus. Wird verwendet, wenn Sie Aufgaben nacheinander ausführen möchten
FixedThreadPool
Führen Sie die Aufgabe mit der angegebenen festen Anzahl von Threads aus. Wird zur Parallelisierung der arithmetischen Verarbeitung verwendet.
CachedThreadPool
Threads werden nach Bedarf erstellt, die erstellten Threads werden zwischengespeichert und für einen bestimmten Zeitraum wiederverwendet, und Threads, die eine Weile nicht verwendet wurden, werden beendet. Wird für die E / A-Verarbeitung verwendet, die sich mit der Zeit ändert und eine lange Wartezeit hat

Implementierung von Factory-Methoden

Werfen wir einen Blick auf die Implementierung dieser Factory-Methoden.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Zunächst bin ich neugierig, dass SingleThreadExecutor in FinalizableDelegatedExecutorService eingeschlossen ist, aber wenn Sie sich die Implementierung ansehen

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

Es klingt wie eine Wrap-Klasse, die nur "shutdown ()" mit "finalize ()" aufruft. Aber warum nur SingleThreadExecutor? Es ist nicht das Hauptthema, also werde ich es verlassen.

Sie können sehen, dass beide Executoren die ThreadPoolExecutor-Klasse verwenden. Es scheint, dass sich das Verhalten je nach Parameter ändert.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

Ich möchte einen ExecutorService erstellen, der die Anzahl der Threads abhängig vom Aufgabenstatus erhöht oder verringert und eine Obergrenze für die Anzahl der Threads festlegt.

Wenn Sie viele Aufgaben ausführen möchten, die im Grunde dazu führen, dass E / A warten usw., werden Sie wahrscheinlich CachedThreadPool verwenden. Bei der Definition, die ich zuvor geschrieben habe, beträgt die maximale Anzahl von Threads jedoch "Integer.MAX_VALUE". Wenn ein Thread auf E / A wartet, ist es sinnvoll, einen separaten Thread zu starten, da die CPU frei ist. Natürlich steigen die Speicherkosten usw. jedoch abhängig von der Anzahl der Threads. Sie sollten daher vermeiden, zu viele Threads zu starten.

Wenn ja, ist es ein fester Thread-Pool? Ich möchte jedoch eine gewisse maximale Parallelität während der Stoßzeiten, aber wenn ich sie nicht benötige, möchte ich, dass Sie die Anzahl der Threads reduzieren ... Ich denke, dass eine solche Anforderung normal ist. Aus diesem Grund möchte ich die Version von CachedThreadPool zur Einstellung der Thread-Begrenzung.

Versuchen Sie, das Argument von ThreadPoolExecutor zu ändern

Kann ich das Argument ein wenig ändern?

Weil CachedThreadPool so ist ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Wenn Sie Integer.MAX_VALUE ändern, ja

new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

Da das 3. und 4. Argument die Cache-Zeit des Threads sind, ändern wir es hier und testen es.

Prüfung

@Test
public void test() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize; i++) {
        final CountDownLatch startLatch = new CountDownLatch(1);
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            endLatch.countDown();
        });
        startLatch.await();
        assertThat(executor.getActiveCount(), is(i + 1));
        assertThat(executor.getPoolSize(), is(i + 1));
    }
    endLatch.await();
    Thread.sleep(100);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(poolSize));
    Thread.sleep(1000);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(0));
}

Die Anzahl der Threads erhöht sich um die Last, und wenn die Aufgabe abgeschlossen ist, bleiben inaktive Threads erhalten, und wenn die Cache-Zeit überschritten wird, enden auch die gepoolten Threads. Ja ja gut

Stellen wir als nächstes sicher, dass der Grad der Parallelität innerhalb der Obergrenze liegt.

@Test
public void test2() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    final CountDownLatch startLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize + 1; i++) {
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    startLatch.await();
    assertThat(executor.getActiveCount(), is(poolSize));
    assertThat(executor.getPoolSize(), is(poolSize));
}
java.util.concurrent.RejectedExecutionException: Task example.ThreadTest$$Lambda$1/739498517@311d617d rejected from java.util.concurrent.ThreadPoolExecutor@7c53a9eb[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]

……Was……

Denken wir noch einmal darüber nach

Bei erneuter Betrachtung von FixedThreadPool ist das letzte Argument anders.

CachedThreadPool hat SynchronousQueue angegeben. Ich habe es verwendet, ohne es zu wissen, weiß aber nicht, was SynchronousQueue bedeutet Ich benutzte es. Der Erklärung zufolge war es eine Warteschlange, deren Angebot nur dann erfolgreich ist, wenn ein Thread versucht, mit einer blockierenden Warteschlange ohne Kapazität zu lesen.

Nun, da es keine Obergrenze für die Anzahl der Threads gibt, ist in erster Linie keine Kapazität in der Warteschlange erforderlich.

Mit anderen Worten, wenn Sie die verknüpfte Blockierungswarteschlange verwenden, die im Pool mit festen Threads verwendet wird ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
java.lang.AssertionError: 
Expected: is <2>
     but: was <1>
Expected :is <2>
     
Actual   :<1>

orz Diesmal wird die Anzahl der Threads nicht erhöht

Versuchen Sie nicht, es nur durch Intuition zu benutzen

ThreadPoolExecutor Javadoc enthält eine korrekte Erklärung.

** Kern und maximale Poolgröße ** ThreadPoolExecutor passt die Poolgröße (siehe getPoolSize ()) automatisch an die durch corePoolSize (siehe getCorePoolSize ()) und maximumPoolSize (siehe getMaximumPoolSize ()) festgelegten Grenzen an. Wenn eine neue Aufgabe mit der Methode execute (Runnable) gesendet wird und weniger als corePoolSize ausgeführt wird, wird ein neuer Thread erstellt, um die Anforderung zu verarbeiten, selbst wenn andere Arbeitsthreads inaktiv sind. Wenn mehr Threads als corePoolSize und weniger als maximalPoolSize ausgeführt werden, werden neue Threads nur erstellt, wenn die Warteschlange voll ist. Wenn Sie corePoolSize und maximumPoolSize auf denselben Wert setzen, wird ein Thread-Pool mit fester Größe erstellt. Sie können beliebig viele gleichzeitige Aufgaben im Pool speichern, indem Sie maximumPoolSize auf einen Wert festlegen, der in einem praktisch ungebundenen Format vorliegt, z. B. Integer.MAX_VALUE. Die Größe des Kernpools und die maximale Poolgröße werden am häufigsten nur zur Erstellungszeit festgelegt, können jedoch auch dynamisch mit setCorePoolSize (int) und setMaximumPoolSize (int) geändert werden.

Lassen Sie uns die Implementierung sehen

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Beziehung zwischen ThreadPool Executor-Warteschlange und Thread-Pool

Um es einfach zu organisieren

--Wenn die Poolgröße kleiner als corePoolSize ist, wird unabhängig vom Status ein neuer Thread erstellt.

Mit anderen Worten, wenn Sie corePoolSize als 0 und maximumPoolSize als gültige Obergrenze angeben möchten,

Es stellt sich heraus, dass.

Wie sind die erwarteten Werte und die entsprechende Warteschlange angeordnet?

Der Executor, den Sie erstellen möchten

--Wenn ein Thread inaktiv ist, führen Sie die Aufgabe für diesen Thread aus --Wenn keine Threads im Leerlauf vorhanden sind und die Poolgröße unter der maximalen Anzahl von Threads liegt, wird ein neuer Thread erstellt.

Wenn Sie ThreadPoolExecutor verwenden, sind die erforderlichen Warteschlangenmerkmale

--Offer ist erfolgreich, wenn ein Thread versucht zu lesen

Das heißt, es ist eine Warteschlange, deren Verhalten sich je nach Status von ThreadPoolExecutor ändert. Wenn es darum geht, es zu machen, fühlt es sich so an, als wäre es eng miteinander verbunden, wobei sowohl Executor als auch Queue modifiziert und dann aufeinander verwiesen werden.

Bei einigen ThreadPoolExecutor-Konstruktoren können Sie jedoch "RejectedExecutionHandler" als weiteres Argument angeben.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

Wie der Name schon sagt, ist RejectedExecutionHandler ein Handler, der beim Ablehnen void reverseExecution (Runnable r, ThreadPoolExecutor executor); aufruft. Standardmäßig ist ein Handler festgelegt, der einfach eine Ausnahme auslöst. Dies scheint verfügbar zu sein. Das heißt, implementieren Sie sowohl die Schnittstellen "BlockingQueue " als auch "RejectedExecutionHandler" und

--Offer ist erfolgreich, wenn ein Thread versucht zu lesen

Sie müssen es nur zum Laufen bringen.

ExecutorService, der die Anzahl der Threads abhängig vom Taskstatus erhöht oder verringert und eine Obergrenze für die Anzahl der Threads festlegt

Die Klasse, die basierend auf der bisherigen Überlegung erstellt wurde, lautet wie folgt. Ich habe es in Java geschrieben, aber ich brauchte ein Durcheinander von Kesselplattencode im Delegaten, also habe ich es in Kotlin umgeschrieben, damit ich es hier einfügen kann.

class ThreadWorkQueue(
        private val delegate: BlockingQueue<Runnable> = LinkedBlockingQueue()
) : BlockingQueue<Runnable> by delegate,
        RejectedExecutionHandler {
    private val idleThreads = AtomicInteger(0)

    override fun offer(runnable: Runnable): Boolean {
        return if (idleThreads.get() == 0) {
            false
        } else delegate.offer(runnable)
    }

    override fun take(): Runnable {
        idleThreads.incrementAndGet()
        try {
            return delegate.take()
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun poll(timeout: Long, unit: TimeUnit): Runnable? {
        idleThreads.incrementAndGet()
        try {
            return delegate.poll(timeout, unit)
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
        if (executor.isShutdown) {
            throw RejectedExecutionException("Task $r rejected from $executor")
        }
        delegate.offer(r)
    }
}

Durch Übergeben einer Instanz dieser Klasse an ThreadPoolExecutor als workQueue und Handler konnten wir einen ExecutorService mit dem gewünschten Verhalten erstellen. Die endgültige Fabrikmethode ist wie folgt. Die maximale Anzahl von Threads entspricht der Anzahl der verfügbaren Prozessoren, die minimale Anzahl beträgt jedoch 2 Threads. Ich denke, dass es hier verschiedene Einstellungen gibt, abhängig von den Eigenschaften der Aufgabe, die Sie ausführen möchten.

private static ExecutorService createParallelExecutor() {
    final ThreadWorkQueue queue = new ThreadWorkQueue();
    return new ThreadPoolExecutor(0, calculateMaximumPoolSize(),
            1L, TimeUnit.MINUTES, queue, queue);
}

private static int calculateMaximumPoolSize() {
    return Math.max(2, Runtime.getRuntime().availableProcessors());
}

Im Folgenden wird beschrieben, wie Sie einen "Executor Service" erstellen, bei dem die Anzahl der Threads je nach Aufgabenstatus zunimmt oder abnimmt und die Obergrenze für die Anzahl der Threads festgelegt wird.

Um ehrlich zu sein, konnte ich einen ExecutorService erstellen, der sich wie gewünscht verhält, aber ich konnte nicht bewerten, wie effizient er im Vergleich zur vorhandenen Factory-Methode "Executors" ist. Die Empfehlung für die Verwendung von Executor lautet, dass die Verwaltung schwieriger Threads gründlich getestet wird und die Erfolgsbilanz der vorhandenen Bibliothek überlassen bleibt. Wenn Sie also schlecht damit spielen, verschwinden die Vorteile. Sofern nicht unbedingt erforderlich, ist es meiner Meinung nach besser, die Factory-Methode "Executors" zu verwenden.

Recommended Posts

Ich möchte einen ExecutorService erstellen, der die Anzahl der Threads abhängig vom Aufgabenstatus erhöht oder verringert und eine Obergrenze für die Anzahl der Threads festlegt.
Befehl zum Überprüfen der Anzahl und des Status von Java-Threads
Ich möchte eine Methode aufrufen und die Nummer zählen
Die Geschichte von Collectors.groupingBy, die ich für die Nachwelt behalten möchte
Ich möchte die Eingabe begrenzen, indem ich den Zahlenbereich einschränke
Ich möchte die deaktivierte Option abhängig von der Bedingung zu f.radio_button hinzufügen
[Active Admin] Ich möchte die Standardverarbeitung zum Erstellen und Aktualisieren anpassen
[Ruby] Ich möchte nur den Wert des Hash und nur den Schlüssel extrahieren
Ich möchte das Argument der Annotation und das Argument der aufrufenden Methode an den Aspekt übergeben
Ich möchte den Inhalt der Anfrage sehen, ohne vier oder fünf zu sagen
Ich möchte rekursiv die Oberklasse und die Schnittstelle einer bestimmten Klasse erhalten