Eine Übersicht über die Mitgliederschnittstelle des Sammlungsframeworks "Queue" und ihre Unterschnittstellen "BlockingQueue" und "TransferQueue". Insbesondere habe ich Beispielcode für die BlockingQueue-Implementierungsklasse "LinkedBlockingQueue" und die TransferQueue-Implementierungsklasse "LinkedTransferQueue" geschrieben.
Implementierungsklassen für die Schnittstellen BlockingQueue und TransferQueue erleichtern die Implementierung des Producer-Consumer-Musters.
Zitiert aus JavaDoc in BlockingQueue
Die BlockingQueue-Implementierung ist hauptsächlich für die Verwendung in Warteschlangen zwischen Produzenten und Verbrauchern konzipiert, unterstützt jedoch auch die Collection-Schnittstelle.
Beachten Sie, dass BlockingQueue von mehreren Herstellern und mehreren Verbrauchern sicher verwendet werden kann.
Umgebung
Referenz
Einführungsversion: 1.5
Zusätzlich zu den grundlegenden Erfassungsvorgängen bieten Warteschlangen zusätzliche Einfüge-, Extraktions- und Überprüfungsvorgänge. Jede dieser Methoden hat zwei Formen. Einer löst eine Ausnahme aus, wenn die Operation fehlschlägt, und der andere gibt einen speziellen Wert zurück (je nach Operation entweder null oder falsch).
Operation | Methodensignatur | Ausnahme, um auf Fehler zu werfen |
---|---|---|
Einfügen | boolean add(E e) | IllegalStateException |
Abrufen / Löschen | E remove() | NoSuchElementException |
Erhalten | E element() | NoSuchElementException |
add Löst true aus, wenn Sie der Warteschlange ein Element hinzufügen, IllegalStateException, wenn kein freier Speicherplatz verfügbar ist remove Holen Sie sich den Anfang der Warteschlange und löschen Sie ihn. Wenn die Warteschlange leer ist, wird NoSuchElementException ausgelöst element Holen Sie sich den Anfang der Warteschlange und lösen Sie NoSuchElementException aus, wenn die Warteschlange leer ist
Operation | Methodensignatur | Wert zurückgegeben |
---|---|---|
Einfügen | boolean offer(E e) | False, wenn nicht zur Warteschlange hinzugefügt |
Abrufen / Löschen | E poll() | Null, wenn die Warteschlange leer ist |
Erhalten | E peek() | Null, wenn die Warteschlange leer ist |
offer True, wenn Sie der Warteschlange ein Element hinzufügen, andernfalls false poll Holen Sie sich den Anfang der Warteschlange und löschen Sie ihn. Geben Sie null zurück, wenn die Warteschlange leer ist peek Holen Sie sich den Kopf der Warteschlange und geben Sie null zurück, wenn die Warteschlange leer ist
Einführungsversion: 1.5
Eine Warteschlange, die zusätzlich Vorgänge unterstützt, die warten, bis die Warteschlange beim Abrufen eines Elements leer ist, oder warten, bis die Warteschlange beim Speichern eines Elements leer ist.
Operation | Methodensignatur |
---|---|
Einfügen | void put(E e) throws InterruptedException |
Abrufen / Löschen | E take() throws InterruptedException |
Erhalten | - |
put Fügen Sie der Warteschlange ein Element hinzu, warten Sie, bis die Warteschlange leer wird, und lösen Sie eine InterruptedException aus, wenn während des Wartens ein Interrupt auftritt take Holen Sie sich den Kopf der Warteschlange und löschen Sie ihn, warten Sie, bis das Element abgerufen werden kann, und lösen Sie eine InterruptedException aus, wenn während des Wartens ein Interrupt auftritt
Operation | Methodensignatur |
---|---|
Einfügen | boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException |
Abrufen / Löschen | E poll(long timeout, TimeUnit unit) throws InterruptedException |
Erhalten | - |
offer Fügen Sie eine Warteschlange in ein Element ein und warten Sie, bis die Warteschlange frei ist, bis die angegebene Wartezeit erreicht ist poll Rufen Sie den Anfang der Warteschlange ab und löschen Sie ihn. Warten Sie, bis das Element verfügbar ist, bis die angegebene Wartezeit erreicht ist
Operation | Methodensignatur |
---|---|
Überprüfen Sie den freien Speicherplatz | int remainingCapacity() |
remainingCapacity Gibt die Anzahl der zusätzlichen Elemente zurück, die die Warteschlange ohne Blockierung akzeptieren kann. Gibt Integer.MAX_VALUE zurück, wenn keine integrierten Einschränkungen vorhanden sind.
Einführungsversion: 1.5
Eine optionale eingeschränkte Blockierungswarteschlange basierend auf dem Verbindungsknoten. Diese Warteschlange ordnet Elemente nach FIFO (first in, first out). Der Anfang dieser Warteschlange ist das Element, das sich am längsten in der Warteschlange befindet.
Einführungsversion: 1.5
Eine unbegrenzte Blockierungswarteschlange, die dieselben Ordnungsregeln wie die Klasse PriorityQueue verwendet und blockierende Erfassungsvorgänge bereitstellt.
Konstrukteur
Übergeben Sie einen Komparator an den Konstruktor, um die Warteschlangen zu ordnen.
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)
Einführungsversion: 1.7
Eine Blockierungswarteschlange, in der der Hersteller darauf wartet, dass der Verbraucher das Element empfängt.
Operation | Methodensignatur |
---|---|
Einfügen | void transfer(E e) throws InterruptedException |
transfer Übertragen Sie sofort, wenn ein Verbraucher auf den Empfang des Elements wartet, andernfalls warten Sie, bis es vom Verbraucher empfangen wird
Operation | Methodensignatur |
---|---|
Einfügen | boolean tryTransfer(E e) |
tryTransfer Sofort weiterleiten, wenn ein Verbraucher auf den Empfang des Elements wartet, andernfalls false zurückgeben
Operation | Methodensignatur |
---|---|
Einfügen | boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException |
tryTransfer Wenn ein Verbraucher auf den Empfang des Elements wartet, wird es sofort übertragen, andernfalls wartet er darauf, dass der Verbraucher es bis zur angegebenen Wartezeit empfängt, und wenn die angegebene Wartezeit abgelaufen ist, wird false zurückgegeben.
Operation | Methodensignatur |
---|---|
Bestätigung des wartenden Verbrauchers | boolean hasWaitingConsumer() |
Bestätigung des wartenden Verbrauchers | int getWaitingConsumerCount() |
hasWaitingConsumer Gibt true zurück, wenn mindestens ein Consumer darauf wartet, ein Element in BlockingQueue.take oder poll zu erhalten getWaitingConsumerCount Gibt eine geschätzte Anzahl von Verbrauchern zurück, die darauf warten, ein Element in BlockingQueue.take oder poll zu erhalten
Einführungsversion: 1.7
Eine unbegrenzte Übertragungswarteschlange basierend auf verknüpften Knoten. Diese Warteschlange ordnet Elemente in einem FIFO (first in, first out) für einen bestimmten Hersteller an. Der Kopf der Warteschlange ist das Element, das sich für einen bestimmten Produzenten am längsten in der Warteschlange befindet.
In diesem Beispiel fügt 1 Hersteller der BlockingQueue regelmäßig Elemente hinzu (in diesem Beispiel Seriennummern vom Typ Integer), und 3 Verbraucher rufen die Elemente regelmäßig aus der BlockingQueue ab. Anstatt auf unbestimmte Zeit ausgeführt zu werden, endet es, wenn der Produzent das Element 100 Mal zur Warteschlange hinzufügt.
Obwohl dies nicht direkt mit dem Thema dieses Artikels zusammenhängt, verwenden wir den ScheduledExecutorService, um Produzenten und Konsumenten zu implementieren.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingDemo {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
BlockingDemo demo = new BlockingDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
//Startet nach 2 Sekunden und läuft danach alle 1 Sekunde
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main end");
}
class ProducerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
println("shutdown start");
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
println("shutdown end");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
Sie können die Kapazität der Warteschlange mit dem Konstruktor bestimmen. In diesem Beispiel ist die maximale Anzahl von Elementen, die der Warteschlange hinzugefügt werden können, auf 10 festgelegt.
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
Wenn die Kapazität nicht angegeben ist, entspricht sie der angegebenen Integer.MAX_VALUE.
private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
Ich verwende die put-Methode, die auf unbestimmte Zeit blockiert, bis die Warteschlange frei ist. Wenn beim Blockieren der put-Methode ein Interrupt auftritt, tritt eine InterruptedException auf.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
Es verwendet die Abfragemethode, die 1 Sekunde wartet, bis ein Element aus der Warteschlange abgerufen werden kann. Ich habe es auskommentiert, aber wenn ich die take-Methode verwende, wird sie auf unbestimmte Zeit blockiert, bis sie abgerufen werden kann.
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}
In diesem Beispiel fügt 1 Hersteller der TransferQueue regelmäßig Elemente hinzu (in diesem Beispiel Seriennummern vom Typ Integer), und 3 Verbraucher rufen die Elemente regelmäßig aus der TransferQueue ab. Dieses Beispiel endet auch, wenn der Produzent das Element 100 Mal zur Warteschlange hinzufügt.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TransferDemo {
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
TransferDemo demo = new TransferDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main stop");
}
class ProducerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(TransferQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
System.out.println("shutdown");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
Es gibt keine Kapazitätsbeschränkung.
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
Es verwendet eine Übertragungsmethode, die das Element auf unbestimmte Zeit blockiert, bis es vom Verbraucher empfangen wird. Wenn beim Blockieren der Übertragungsmethode ein Interrupt auftritt, tritt eine InterruptedException auf.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
Es verwendet die Abfragemethode, die 1 Sekunde wartet, bis ein Element aus der Warteschlange abgerufen werden kann. Ich habe es auskommentiert, aber wenn ich die take-Methode verwende, wird sie auf unbestimmte Zeit blockiert, bis sie abgerufen werden kann.
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}