A review note of the Collections Framework member interface Queue
and its subinterfaces BlockingQueue
and TransferQueue
.
In particular, I wrote sample code for the BlockingQueue implementation class LinkedBlockingQueue
and the TransferQueue implementation classLinkedTransferQueue
.
Implementation classes for the Interfaces BlockingQueue and TransferQueue make it easy to implement the Producer-Consumer pattern.
Quoted from JavaDoc in BlockingQueue
The BlockingQueue implementation is primarily designed for use in queues between producers and consumers, but it also supports the Collection interface.
Keep in mind that BlockingQueue can be safely used by multiple producers and multiple consumers.
environment
reference
Introductory version: 1.5
In addition to the basic Collection operations, queues provide additional insert, extract, and check operations. Each of these methods has two forms. One throws an exception when the operation fails and the other returns a special value (either null or false depending on the operation).
operation | Method signature | Exception to throw on failure |
---|---|---|
Insert | boolean add(E e) | IllegalStateException |
Get / Delete | E remove() | NoSuchElementException |
Get | E element() | NoSuchElementException |
add Throws true if you add an element to the queue, IllegalStateException if there is no free space available remove Get and delete the beginning of the queue, throw NoSuchElementException if the queue is empty element Get the beginning of the queue, throw NoSuchElementException if the queue is empty
operation | Method signature | Value returned |
---|---|---|
Insert | boolean offer(E e) | False if not queued |
Get / Delete | E poll() | Null if queue is empty |
Get | E peek() | Null if queue is empty |
offer True if you add an element to the queue, false otherwise poll Get and delete the beginning of the queue, return null if the queue is empty peek Get the beginning of the queue, return null if the queue is empty
Introductory version: 1.5
A Queue that additionally supports operations that wait until the queue is empty when retrieving an element, or wait until the queue is empty when storing an element.
operation | Method signature |
---|---|
Insert | void put(E e) throws InterruptedException |
Get / Delete | E take() throws InterruptedException |
Get | - |
put Adds an element to the queue, waits until the queue becomes empty, and throws an InterruptedException if an interrupt occurs during the wait. take Gets and deletes the beginning of the queue, waits until the element can be acquired, and throws an InterruptedException if an interrupt occurs during the wait.
operation | Method signature |
---|---|
Insert | boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException |
Get / Delete | E poll(long timeout, TimeUnit unit) throws InterruptedException |
Get | - |
offer Inserts a queue into an element and waits for the queue to become free until the specified wait time poll Get and delete the head of the queue, wait for the element to become available until the specified wait time
operation | Method signature |
---|---|
Check free space | int remainingCapacity() |
remainingCapacity Returns the number of additional elements that the queue can accept without blocking. Returns Integer.MAX_VALUE if no built-in restrictions exist.
Introductory version: 1.5
An optional restricted blocking queue based on the link node. This queue uses FIFO (first in, first out) to order the elements. The beginning of this queue is the element that has been in the queue for the longest time.
Introductory version: 1.5
An unlimited blocking queue that uses the same ordering rules as the class PriorityQueue and provides blocking capture operations.
constructor
Pass a comparator to the constructor to order the queues.
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)
Introductory version: 1.7
A Blocking Queue where the producer waits for the consumer to receive the element.
operation | Method signature |
---|---|
Insert | void transfer(E e) throws InterruptedException |
transfer Transfer immediately if any consumer is waiting to receive the element, otherwise wait until the consumer receives it
operation | Method signature |
---|---|
Insert | boolean tryTransfer(E e) |
tryTransfer Forward immediately if any consumer is waiting to receive the element, otherwise return false
operation | Method signature |
---|---|
Insert | boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException |
tryTransfer Transfers immediately if there is a consumer waiting to receive the element, otherwise waits for the consumer to receive until the specified wait time, returns false if the specified wait time has elapsed
operation | Method signature |
---|---|
Confirmation of waiting consumers | boolean hasWaitingConsumer() |
Confirmation of waiting consumers | int getWaitingConsumerCount() |
hasWaitingConsumer Returns true if there is at least one consumer waiting to receive the element in BlockingQueue.take or poll getWaitingConsumerCount Returns an estimated number of consumers waiting to receive an element in BlockingQueue.take or poll
Introductory version: 1.7
An unlimited TransferQueue based on linked nodes. This queue orders the elements in FIFO (first in, first out) for any specified producer. The head of the queue is the element that has been in the queue for the longest time for a particular producer.
In this sample, 1 producer periodically adds elements to the Blocking Queue (integer type serial numbers in this sample), and 3 consumers periodically retrieve the elements from the Blocking Queue. Instead of running indefinitely, it will exit when the producer adds the element to the queue 100 times.
Also, although not directly related to the subject of this article, we use the ScheduledExecutorService to implement producers and consumers.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingDemo {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
BlockingDemo demo = new BlockingDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
//Starts after 2 seconds, then runs at 1 second intervals
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main end");
}
class ProducerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
println("shutdown start");
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
println("shutdown end");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
You can determine the capacity of the queue in the constructor. In this sample, the maximum number of elements that can be added to the queue is set to 10.
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
If the capacity is not specified, it is the same as Integer.MAX_VALUE specified.
private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
It uses a put method that blocks indefinitely until the queue is free. If an interrupt occurs while blocking the put method, an InterruptedException will occur.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
It uses the poll method, which waits for 1 second before dequeuing an element from the queue. I've commented it out, but using the take method will block it indefinitely until it can be retrieved.
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}
In this sample, 1 producer periodically adds elements (integer type serial number in this sample) to TransferQueue, and 3 consumers periodically retrieve elements from TransferQueue. This sample also ends when the producer adds the element to the queue 100 times.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TransferDemo {
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
TransferDemo demo = new TransferDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main stop");
}
class ProducerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(TransferQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
System.out.println("shutdown");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
There is no capacity limit.
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
It uses a transfer method that blocks indefinitely until the element is received by the consumer. If an interrupt occurs while blocking the transfer method, an InterruptedException will occur.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
It uses the poll method, which waits for 1 second before dequeuing an element from the queue. I've commented it out, but using the take method will block it indefinitely until it can be retrieved.
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}