[JAVA] Überprüfungsnotizen zu Queue / BlockingQueue / TransferQueue

Überblick

Eine Übersicht über die Mitgliederschnittstelle des Sammlungsframeworks "Queue" und ihre Unterschnittstellen "BlockingQueue" und "TransferQueue". Insbesondere habe ich Beispielcode für die BlockingQueue-Implementierungsklasse "LinkedBlockingQueue" und die TransferQueue-Implementierungsklasse "LinkedTransferQueue" geschrieben.

Implementierungsklassen für die Schnittstellen BlockingQueue und TransferQueue erleichtern die Implementierung des Producer-Consumer-Musters.

Zitiert aus JavaDoc in BlockingQueue

Die BlockingQueue-Implementierung ist hauptsächlich für die Verwendung in Warteschlangen zwischen Produzenten und Verbrauchern konzipiert, unterstützt jedoch auch die Collection-Schnittstelle.

Beachten Sie, dass BlockingQueue von mehreren Herstellern und mehreren Verbrauchern sicher verwendet werden kann.

Umgebung

Referenz

Rezension

Schnittstellenwarteschlange \ <E >

Einführungsversion: 1.5

Zusätzlich zu den grundlegenden Erfassungsvorgängen bieten Warteschlangen zusätzliche Einfüge-, Extraktions- und Überprüfungsvorgänge. Jede dieser Methoden hat zwei Formen. Einer löst eine Ausnahme aus, wenn die Operation fehlschlägt, und der andere gibt einen speziellen Wert zurück (je nach Operation entweder null oder falsch).

Warteschlangenoperationsmethode (löst bei einem Fehler eine Ausnahme aus)

Operation Methodensignatur Ausnahme, um auf Fehler zu werfen
Einfügen boolean add​(E e) IllegalStateException
Abrufen / Löschen E remove() NoSuchElementException
Erhalten E element() NoSuchElementException

add Löst true aus, wenn Sie der Warteschlange ein Element hinzufügen, IllegalStateException, wenn kein freier Speicherplatz verfügbar ist remove Holen Sie sich den Anfang der Warteschlange und löschen Sie ihn. Wenn die Warteschlange leer ist, wird NoSuchElementException ausgelöst element Holen Sie sich den Anfang der Warteschlange und lösen Sie NoSuchElementException aus, wenn die Warteschlange leer ist

Warteschlangenoperationsmethode (gibt einen speziellen Wert zurück)

Operation Methodensignatur Wert zurückgegeben
Einfügen boolean offer​(E e) False, wenn nicht zur Warteschlange hinzugefügt
Abrufen / Löschen E poll() Null, wenn die Warteschlange leer ist
Erhalten E peek() Null, wenn die Warteschlange leer ist

offer True, wenn Sie der Warteschlange ein Element hinzufügen, andernfalls false poll Holen Sie sich den Anfang der Warteschlange und löschen Sie ihn. Geben Sie null zurück, wenn die Warteschlange leer ist peek Holen Sie sich den Kopf der Warteschlange und geben Sie null zurück, wenn die Warteschlange leer ist

Interface BlockingQueue \ <E >

Einführungsversion: 1.5

Eine Warteschlange, die zusätzlich Vorgänge unterstützt, die warten, bis die Warteschlange beim Abrufen eines Elements leer ist, oder warten, bis die Warteschlange beim Speichern eines Elements leer ist.

Sperrvorgang

Operation Methodensignatur
Einfügen void put​(E e) throws InterruptedException
Abrufen / Löschen E take() throws InterruptedException
Erhalten -

put Fügen Sie der Warteschlange ein Element hinzu, warten Sie, bis die Warteschlange leer wird, und lösen Sie eine InterruptedException aus, wenn während des Wartens ein Interrupt auftritt take Holen Sie sich den Kopf der Warteschlange und löschen Sie ihn, warten Sie, bis das Element abgerufen werden kann, und lösen Sie eine InterruptedException aus, wenn während des Wartens ein Interrupt auftritt

Operation zum Timeout

Operation Methodensignatur
Einfügen boolean offer​(E e, long timeout, TimeUnit unit) throws InterruptedException
Abrufen / Löschen E poll​(long timeout, TimeUnit unit) throws InterruptedException
Erhalten -

offer Fügen Sie eine Warteschlange in ein Element ein und warten Sie, bis die Warteschlange frei ist, bis die angegebene Wartezeit erreicht ist poll Rufen Sie den Anfang der Warteschlange ab und löschen Sie ihn. Warten Sie, bis das Element verfügbar ist, bis die angegebene Wartezeit erreicht ist

Überprüfen Sie den freien Speicherplatz in der Warteschlange

Operation Methodensignatur
Überprüfen Sie den freien Speicherplatz int remainingCapacity()

remainingCapacity Gibt die Anzahl der zusätzlichen Elemente zurück, die die Warteschlange ohne Blockierung akzeptieren kann. Gibt Integer.MAX_VALUE zurück, wenn keine integrierten Einschränkungen vorhanden sind.

Klasse LinkedBlockingQueue \ <E >

Einführungsversion: 1.5

Eine optionale eingeschränkte Blockierungswarteschlange basierend auf dem Verbindungsknoten. Diese Warteschlange ordnet Elemente nach FIFO (first in, first out). Der Anfang dieser Warteschlange ist das Element, das sich am längsten in der Warteschlange befindet.

Class PriorityBlockingQueue \ <E >

Einführungsversion: 1.5

Eine unbegrenzte Blockierungswarteschlange, die dieselben Ordnungsregeln wie die Klasse PriorityQueue verwendet und blockierende Erfassungsvorgänge bereitstellt.

Konstrukteur

Übergeben Sie einen Komparator an den Konstruktor, um die Warteschlangen zu ordnen.

public PriorityBlockingQueue​(int initialCapacity,
                             Comparator<? super E> comparator)
public PriorityBlockingQueue​(Collection<? extends E> c)

Interface TransferQueue \ <E >

Einführungsversion: 1.7

Eine Blockierungswarteschlange, in der der Hersteller darauf wartet, dass der Verbraucher das Element empfängt.

Sperrvorgang

Operation Methodensignatur
Einfügen void transfer​(E e) throws InterruptedException

transfer Übertragen Sie sofort, wenn ein Verbraucher auf den Empfang des Elements wartet, andernfalls warten Sie, bis es vom Verbraucher empfangen wird

Betrieb ohne Zeitüberschreitung

Operation Methodensignatur
Einfügen boolean tryTransfer​(E e)

tryTransfer Sofort weiterleiten, wenn ein Verbraucher auf den Empfang des Elements wartet, andernfalls false zurückgeben

Operation zum Timeout

Operation Methodensignatur
Einfügen boolean tryTransfer​(E e, long timeout, TimeUnit unit) throws InterruptedException

tryTransfer Wenn ein Verbraucher auf den Empfang des Elements wartet, wird es sofort übertragen, andernfalls wartet er darauf, dass der Verbraucher es bis zur angegebenen Wartezeit empfängt, und wenn die angegebene Wartezeit abgelaufen ist, wird false zurückgegeben.

Überprüfen Sie, welche Verbraucher warten

Operation Methodensignatur
Bestätigung des wartenden Verbrauchers boolean hasWaitingConsumer()
Bestätigung des wartenden Verbrauchers int getWaitingConsumerCount()

hasWaitingConsumer Gibt true zurück, wenn mindestens ein Consumer darauf wartet, ein Element in BlockingQueue.take oder poll zu erhalten getWaitingConsumerCount Gibt eine geschätzte Anzahl von Verbrauchern zurück, die darauf warten, ein Element in BlockingQueue.take oder poll zu erhalten

Klasse LinkedTransferQueue \ <E >

Einführungsversion: 1.7

Eine unbegrenzte Übertragungswarteschlange basierend auf verknüpften Knoten. Diese Warteschlange ordnet Elemente in einem FIFO (first in, first out) für einen bestimmten Hersteller an. Der Kopf der Warteschlange ist das Element, das sich für einen bestimmten Produzenten am längsten in der Warteschlange befindet.

Beispielcode

LinkedBlockingQueue-Beispielcode

In diesem Beispiel fügt 1 Hersteller der BlockingQueue regelmäßig Elemente hinzu (in diesem Beispiel Seriennummern vom Typ Integer), und 3 Verbraucher rufen die Elemente regelmäßig aus der BlockingQueue ab. Anstatt auf unbestimmte Zeit ausgeführt zu werden, endet es, wenn der Produzent das Element 100 Mal zur Warteschlange hinzufügt.

Obwohl dies nicht direkt mit dem Thema dieses Artikels zusammenhängt, verwenden wir den ScheduledExecutorService, um Produzenten und Konsumenten zu implementieren.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingDemo {

	private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		BlockingDemo demo = new BlockingDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		//Startet nach 2 Sekunden und läuft danach alle 1 Sekunde
		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main end");
	}

	class ProducerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.put(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(BlockingQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				//Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		println("shutdown start");

		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}

		println("shutdown end");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

Konstrukteur

Sie können die Kapazität der Warteschlange mit dem Konstruktor bestimmen. In diesem Beispiel ist die maximale Anzahl von Elementen, die der Warteschlange hinzugefügt werden können, auf 10 festgelegt.

private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

Wenn die Kapazität nicht angegeben ist, entspricht sie der angegebenen Integer.MAX_VALUE.

private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

Fügen Sie ein Element in die Warteschlange ein

Ich verwende die put-Methode, die auf unbestimmte Zeit blockiert, bis die Warteschlange frei ist. Wenn beim Blockieren der put-Methode ein Interrupt auftritt, tritt eine InterruptedException auf.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.put(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Elemente aus der Warteschlange entfernen

Es verwendet die Abfragemethode, die 1 Sekunde wartet, bis ein Element aus der Warteschlange abgerufen werden kann. Ich habe es auskommentiert, aber wenn ich die take-Methode verwende, wird sie auf unbestimmte Zeit blockiert, bis sie abgerufen werden kann.

try {
	//Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Beispielcode für die verknüpfte Übertragungswarteschlange

In diesem Beispiel fügt 1 Hersteller der TransferQueue regelmäßig Elemente hinzu (in diesem Beispiel Seriennummern vom Typ Integer), und 3 Verbraucher rufen die Elemente regelmäßig aus der TransferQueue ab. Dieses Beispiel endet auch, wenn der Produzent das Element 100 Mal zur Warteschlange hinzufügt.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class TransferDemo {

	private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		TransferDemo demo = new TransferDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main stop");
	}

	class ProducerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.transfer(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(TransferQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				// Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					System.err.println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}
		System.out.println("shutdown");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

Konstrukteur

Es gibt keine Kapazitätsbeschränkung.

private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();

Fügen Sie ein Element in die Warteschlange ein

Es verwendet eine Übertragungsmethode, die das Element auf unbestimmte Zeit blockiert, bis es vom Verbraucher empfangen wird. Wenn beim Blockieren der Übertragungsmethode ein Interrupt auftritt, tritt eine InterruptedException auf.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.transfer(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Elemente aus der Warteschlange entfernen

Es verwendet die Abfragemethode, die 1 Sekunde wartet, bis ein Element aus der Warteschlange abgerufen werden kann. Ich habe es auskommentiert, aber wenn ich die take-Methode verwende, wird sie auf unbestimmte Zeit blockiert, bis sie abgerufen werden kann.

try {
	// Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Recommended Posts

Überprüfungsnotizen zu Queue / BlockingQueue / TransferQueue
Enum Review Memo
Ein Überprüfungshinweis zur Funktionsoberfläche
Überprüfungshinweise zu Java NIO 2
Überprüfungshinweise zum Java Collections Framework