Une note de révision de l'interface des membres du framework de collections Queue
et de ses sous-interfaces BlockingQueue
et TransferQueue
.
En particulier, j'ai écrit un exemple de code pour la classe d'implémentation BlockingQueue LinkedBlockingQueue
et la classe d'implémentation TransferQueueLinkedTransferQueue
.
Les classes d'implémentation des interfaces BlockingQueue et TransferQueue facilitent l'implémentation du modèle Producer-Consumer.
Cité de JavaDoc dans BlockingQueue
L'implémentation BlockingQueue est principalement conçue pour être utilisée dans les files d'attente entre producteurs et consommateurs, mais elle prend également en charge l'interface Collection.
Notez que BlockingQueue peut être utilisé en toute sécurité par plusieurs producteurs et plusieurs consommateurs.
environnement
référence
Version d'introduction: 1.5
En plus des opérations de collecte de base, les files d'attente fournissent des opérations supplémentaires d'insertion, d'extraction et de vérification. Chacune de ces méthodes a deux formes. L'un lève une exception lorsque l'opération échoue et l'autre renvoie une valeur spéciale (nulle ou fausse selon l'opération).
opération | Signature de méthode | Exception à lancer en cas d'échec |
---|---|---|
Insérer | boolean add(E e) | IllegalStateException |
Obtenir / Supprimer | E remove() | NoSuchElementException |
Avoir | E element() | NoSuchElementException |
add Renvoie true si vous ajoutez un élément à la file d'attente, IllegalStateException s'il n'y a pas d'espace libre disponible remove Récupérez et supprimez le début de la file d'attente, lancez NoSuchElementException si la file d'attente est vide element Obtenez le début de la file d'attente, lancez NoSuchElementException si la file d'attente est vide
opération | Signature de méthode | Valeur retournée |
---|---|---|
Insérer | boolean offer(E e) | Faux si non ajouté à la file d'attente |
Obtenir / Supprimer | E poll() | Null si la file d'attente est vide |
Avoir | E peek() | Null si la file d'attente est vide |
offer Vrai si vous ajoutez un élément à la file d'attente, faux sinon poll Récupère et supprime le début de la file d'attente, renvoie null si la file d'attente est vide peek Récupère la tête de la file d'attente, renvoie null si la file d'attente est vide
Version d'introduction: 1.5
Une file d'attente qui attend que la file d'attente soit vide lors de la récupération d'un élément, et prend en charge en outre les opérations qui attendent que la file d'attente soit vide lors du stockage d'un élément.
opération | Signature de méthode |
---|---|
Insérer | void put(E e) throws InterruptedException |
Obtenir / Supprimer | E take() throws InterruptedException |
Avoir | - |
put Ajoutez un élément à la file d'attente, attendez que la file d'attente devienne vide, lancez une InterruptedException si une interruption se produit pendant l'attente take Récupérez et supprimez le début de la file d'attente, attendez que l'élément puisse être obtenu, lancez une InterruptedException si une interruption se produit pendant l'attente
opération | Signature de méthode |
---|---|
Insérer | boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException |
Obtenir / Supprimer | E poll(long timeout, TimeUnit unit) throws InterruptedException |
Avoir | - |
offer Insérez une file d'attente dans un élément et attendez que la file d'attente devienne libre jusqu'au temps d'attente spécifié poll Obtenez et supprimez la tête de la file d'attente, attendez que l'élément devienne disponible jusqu'au temps d'attente spécifié
opération | Signature de méthode |
---|---|
Vérifier l'espace libre | int remainingCapacity() |
remainingCapacity Renvoie le nombre d'éléments supplémentaires que la file d'attente peut accepter sans blocage. Renvoie Integer.MAX_VALUE si aucune restriction intégrée n'existe.
Version d'introduction: 1.5
Une file d'attente de blocage restreinte facultative basée sur le nœud de liaison. Cette file d'attente classe les éléments par FIFO (premier entré, premier sorti). Le début de cette file d'attente est l'élément qui se trouve dans la file d'attente depuis le plus longtemps.
Version d'introduction: 1.5
Une file d'attente de blocage illimitée qui utilise les mêmes règles de classement que la classe PriorityQueue et fournit des opérations de capture de blocage.
constructeur
Passez un comparateur au constructeur pour ordonner les files d'attente.
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)
Version d'introduction: 1.7
Une file d'attente de blocage où le producteur attend que le consommateur reçoive l'élément.
opération | Signature de méthode |
---|---|
Insérer | void transfer(E e) throws InterruptedException |
transfer Transférer immédiatement si un consommateur attend que l'élément soit reçu, sinon attendre que le consommateur le reçoive
opération | Signature de méthode |
---|---|
Insérer | boolean tryTransfer(E e) |
tryTransfer Transférer immédiatement si un consommateur attend de recevoir l'élément, sinon renvoyer false
opération | Signature de méthode |
---|---|
Insérer | boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException |
tryTransfer Transfère immédiatement s'il y a un consommateur en attente de recevoir l'élément, sinon attend que le consommateur le reçoive jusqu'au temps d'attente spécifié, retourne false si le temps d'attente spécifié s'est écoulé
opération | Signature de méthode |
---|---|
Confirmation du consommateur en attente | boolean hasWaitingConsumer() |
Confirmation du consommateur en attente | int getWaitingConsumerCount() |
hasWaitingConsumer Renvoie true s'il y a au moins un consommateur en attente de recevoir un élément dans BlockingQueue.take ou poll getWaitingConsumerCount Renvoie un nombre estimé de consommateurs en attente de recevoir un élément dans BlockingQueue.take ou poll
Version d'introduction: 1.7
Une file d'attente de transfert illimitée basée sur des nœuds liés. Cette file d'attente classe les éléments dans un FIFO (premier entré, premier sorti) pour tout producteur spécifié. La tête de la file d'attente est l'élément qui est dans la file d'attente depuis le plus longtemps pour un producteur particulier.
Dans cet exemple, 1 producteur ajoute périodiquement des éléments à BlockingQueue (dans cet exemple, les numéros de série de type Integer) et 3 consommateurs récupèrent périodiquement les éléments de BlockingQueue. Au lieu de s'exécuter indéfiniment, il se termine lorsque le producteur ajoute l'élément à la file d'attente 100 fois.
De plus, bien que cela ne soit pas directement lié au sujet de cet article, nous utilisons ScheduledExecutorService pour implémenter les producteurs et les consommateurs.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingDemo {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
BlockingDemo demo = new BlockingDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
//Démarre après 2 secondes et s'exécute toutes les 1 seconde par la suite
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main end");
}
class ProducerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
println("shutdown start");
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
println("shutdown end");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
Vous pouvez déterminer la capacité de la file d'attente avec le constructeur. Dans cet exemple, le nombre maximal d'éléments pouvant être ajoutés à la file d'attente est défini sur 10.
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
Si la capacité n'est pas spécifiée, elle est identique à Integer.MAX_VALUE spécifié.
private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
J'utilise la méthode put qui bloque indéfiniment jusqu'à ce que la file d'attente soit libre. Si une interruption se produit lors du blocage de la méthode put, une InterruptedException se produit.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
Il utilise la méthode poll, qui attend 1 seconde jusqu'à ce qu'il puisse récupérer un élément de la file d'attente. Je l'ai commenté, mais si j'utilise la méthode take, il se bloquera indéfiniment jusqu'à ce qu'il puisse être récupéré.
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Dans cet exemple, 1 producteur ajoute périodiquement des éléments à TransferQueue (dans cet exemple, les numéros de série de type Integer) et 3 consommateurs récupèrent périodiquement les éléments de TransferQueue. Cet exemple se termine également lorsque le producteur ajoute l'élément à la file d'attente 100 fois.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TransferDemo {
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
TransferDemo demo = new TransferDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main stop");
}
class ProducerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(TransferQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
System.out.println("shutdown");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
Il n'y a pas de limite de capacité.
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
Il utilise une méthode de transfert qui bloque l'élément indéfiniment jusqu'à ce qu'il soit reçu par le consommateur. Si une interruption se produit lors du blocage de la méthode de transfert, une InterruptedException se produit.
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
Il utilise la méthode poll, qui attend 1 seconde jusqu'à ce qu'il puisse récupérer un élément de la file d'attente. Je l'ai commenté, mais si j'utilise la méthode take, il se bloquera indéfiniment jusqu'à ce qu'il puisse être récupéré.
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}