[JAVA] Notes de révision des files d'attente / BlockingQueue / TransferQueue

Aperçu

Une note de révision de l'interface des membres du framework de collections Queue et de ses sous-interfaces BlockingQueue et TransferQueue. En particulier, j'ai écrit un exemple de code pour la classe d'implémentation BlockingQueue LinkedBlockingQueue et la classe d'implémentation TransferQueueLinkedTransferQueue.

Les classes d'implémentation des interfaces BlockingQueue et TransferQueue facilitent l'implémentation du modèle Producer-Consumer.

Cité de JavaDoc dans BlockingQueue

L'implémentation BlockingQueue est principalement conçue pour être utilisée dans les files d'attente entre producteurs et consommateurs, mais elle prend également en charge l'interface Collection.

Notez que BlockingQueue peut être utilisé en toute sécurité par plusieurs producteurs et plusieurs consommateurs.

environnement

référence

la revue

Interface Queue \ <E >

Version d'introduction: 1.5

En plus des opérations de collecte de base, les files d'attente fournissent des opérations supplémentaires d'insertion, d'extraction et de vérification. Chacune de ces méthodes a deux formes. L'un lève une exception lorsque l'opération échoue et l'autre renvoie une valeur spéciale (nulle ou fausse selon l'opération).

Méthode d'opération de file d'attente (lève une exception en cas d'échec)

opération Signature de méthode Exception à lancer en cas d'échec
Insérer boolean add​(E e) IllegalStateException
Obtenir / Supprimer E remove() NoSuchElementException
Avoir E element() NoSuchElementException

add Renvoie true si vous ajoutez un élément à la file d'attente, IllegalStateException s'il n'y a pas d'espace libre disponible remove Récupérez et supprimez le début de la file d'attente, lancez NoSuchElementException si la file d'attente est vide element Obtenez le début de la file d'attente, lancez NoSuchElementException si la file d'attente est vide

Méthode d'opération de file d'attente (renvoie une valeur spéciale)

opération Signature de méthode Valeur retournée
Insérer boolean offer​(E e) Faux si non ajouté à la file d'attente
Obtenir / Supprimer E poll() Null si la file d'attente est vide
Avoir E peek() Null si la file d'attente est vide

offer Vrai si vous ajoutez un élément à la file d'attente, faux sinon poll Récupère et supprime le début de la file d'attente, renvoie null si la file d'attente est vide peek Récupère la tête de la file d'attente, renvoie null si la file d'attente est vide

Interface BlockingQueue \ <E >

Version d'introduction: 1.5

Une file d'attente qui attend que la file d'attente soit vide lors de la récupération d'un élément, et prend en charge en outre les opérations qui attendent que la file d'attente soit vide lors du stockage d'un élément.

Opération de blocage

opération Signature de méthode
Insérer void put​(E e) throws InterruptedException
Obtenir / Supprimer E take() throws InterruptedException
Avoir -

put Ajoutez un élément à la file d'attente, attendez que la file d'attente devienne vide, lancez une InterruptedException si une interruption se produit pendant l'attente take Récupérez et supprimez le début de la file d'attente, attendez que l'élément puisse être obtenu, lancez une InterruptedException si une interruption se produit pendant l'attente

Opération à expiration

opération Signature de méthode
Insérer boolean offer​(E e, long timeout, TimeUnit unit) throws InterruptedException
Obtenir / Supprimer E poll​(long timeout, TimeUnit unit) throws InterruptedException
Avoir -

offer Insérez une file d'attente dans un élément et attendez que la file d'attente devienne libre jusqu'au temps d'attente spécifié poll Obtenez et supprimez la tête de la file d'attente, attendez que l'élément devienne disponible jusqu'au temps d'attente spécifié

Vérifiez l'espace libre dans la file d'attente

opération Signature de méthode
Vérifier l'espace libre int remainingCapacity()

remainingCapacity Renvoie le nombre d'éléments supplémentaires que la file d'attente peut accepter sans blocage. Renvoie Integer.MAX_VALUE si aucune restriction intégrée n'existe.

Classe LinkedBlockingQueue \ <E >

Version d'introduction: 1.5

Une file d'attente de blocage restreinte facultative basée sur le nœud de liaison. Cette file d'attente classe les éléments par FIFO (premier entré, premier sorti). Le début de cette file d'attente est l'élément qui se trouve dans la file d'attente depuis le plus longtemps.

Classe PriorityBlockingQueue \ <E >

Version d'introduction: 1.5

Une file d'attente de blocage illimitée qui utilise les mêmes règles de classement que la classe PriorityQueue et fournit des opérations de capture de blocage.

constructeur

Passez un comparateur au constructeur pour ordonner les files d'attente.

public PriorityBlockingQueue​(int initialCapacity,
                             Comparator<? super E> comparator)
public PriorityBlockingQueue​(Collection<? extends E> c)

Interface TransferQueue \ <E >

Version d'introduction: 1.7

Une file d'attente de blocage où le producteur attend que le consommateur reçoive l'élément.

Opération de blocage

opération Signature de méthode
Insérer void transfer​(E e) throws InterruptedException

transfer Transférer immédiatement si un consommateur attend que l'élément soit reçu, sinon attendre que le consommateur le reçoive

Opération qui n'expire pas

opération Signature de méthode
Insérer boolean tryTransfer​(E e)

tryTransfer Transférer immédiatement si un consommateur attend de recevoir l'élément, sinon renvoyer false

Opération à expiration

opération Signature de méthode
Insérer boolean tryTransfer​(E e, long timeout, TimeUnit unit) throws InterruptedException

tryTransfer Transfère immédiatement s'il y a un consommateur en attente de recevoir l'élément, sinon attend que le consommateur le reçoive jusqu'au temps d'attente spécifié, retourne false si le temps d'attente spécifié s'est écoulé

Vérifiez quels consommateurs attendent

opération Signature de méthode
Confirmation du consommateur en attente boolean hasWaitingConsumer()
Confirmation du consommateur en attente int getWaitingConsumerCount()

hasWaitingConsumer Renvoie true s'il y a au moins un consommateur en attente de recevoir un élément dans BlockingQueue.take ou poll getWaitingConsumerCount Renvoie un nombre estimé de consommateurs en attente de recevoir un élément dans BlockingQueue.take ou poll

Classe LinkedTransferQueue \ <E >

Version d'introduction: 1.7

Une file d'attente de transfert illimitée basée sur des nœuds liés. Cette file d'attente classe les éléments dans un FIFO (premier entré, premier sorti) pour tout producteur spécifié. La tête de la file d'attente est l'élément qui est dans la file d'attente depuis le plus longtemps pour un producteur particulier.

Exemple de code

Exemple de code LinkedBlockingQueue

Dans cet exemple, 1 producteur ajoute périodiquement des éléments à BlockingQueue (dans cet exemple, les numéros de série de type Integer) et 3 consommateurs récupèrent périodiquement les éléments de BlockingQueue. Au lieu de s'exécuter indéfiniment, il se termine lorsque le producteur ajoute l'élément à la file d'attente 100 fois.

De plus, bien que cela ne soit pas directement lié au sujet de cet article, nous utilisons ScheduledExecutorService pour implémenter les producteurs et les consommateurs.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingDemo {

	private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		BlockingDemo demo = new BlockingDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		//Démarre après 2 secondes et s'exécute toutes les 1 seconde par la suite
		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main end");
	}

	class ProducerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.put(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final BlockingQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(BlockingQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				//Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		println("shutdown start");

		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}

		println("shutdown end");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

constructeur

Vous pouvez déterminer la capacité de la file d'attente avec le constructeur. Dans cet exemple, le nombre maximal d'éléments pouvant être ajoutés à la file d'attente est défini sur 10.

private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

Si la capacité n'est pas spécifiée, elle est identique à Integer.MAX_VALUE spécifié.

private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

Insérer un élément dans la file d'attente

J'utilise la méthode put qui bloque indéfiniment jusqu'à ce que la file d'attente soit libre. Si une interruption se produit lors du blocage de la méthode put, une InterruptedException se produit.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.put(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Supprimer des éléments de la file d'attente

Il utilise la méthode poll, qui attend 1 seconde jusqu'à ce qu'il puisse récupérer un élément de la file d'attente. Je l'ai commenté, mais si j'utilise la méthode take, il se bloquera indéfiniment jusqu'à ce qu'il puisse être récupéré.

try {
	//Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Exemple de code de file d'attente de transfert lié

Dans cet exemple, 1 producteur ajoute périodiquement des éléments à TransferQueue (dans cet exemple, les numéros de série de type Integer) et 3 consommateurs récupèrent périodiquement les éléments de TransferQueue. Cet exemple se termine également lorsque le producteur ajoute l'élément à la file d'attente 100 fois.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class TransferDemo {

	private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
	private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
	private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);

	private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

	public static void main(String[] args) throws Exception {
		TransferDemo demo = new TransferDemo();
		demo.execute();
	}

	void execute() throws Exception {
		println("main start");

		CountDownLatch doneSignal = new CountDownLatch(100);

		println("create producer task");
		producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);

		println("create consumer task");
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
		consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);

		doneSignal.await();

		shutdown(producer);
		shutdown(consumer);

		println("main stop");
	}

	class ProducerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final CountDownLatch doneSignal;
		private final AtomicInteger counter = new AtomicInteger(0);
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
			this.queue = queue;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			try {
				Integer e = Integer.valueOf(counter.incrementAndGet());
				queue.transfer(e);
				System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class ConsumerTask implements Runnable {
		private final TransferQueue<Integer> queue;
		private final String name;
		private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");

		ConsumerTask(TransferQueue<Integer> queue, String name) {
			this.queue = queue;
			this.name = name;
		}

		@Override
		public void run() {
			try {
				// Integer e = queue.take();
				Integer e = queue.poll(1, TimeUnit.SECONDS);
				if (e != null) {
					System.out.println(String.format("[%s]             [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	void shutdown(ScheduledExecutorService service) {
		// Disable new tasks from being submitted
		service.shutdown();
		try {
			// Wait a while for existing tasks to terminate
			if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
				// Cancel currently executing tasks
				service.shutdownNow();
				// Wait a while for tasks to respond to being cancelled
				if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
					System.err.println("Pool did not terminate");
				}
			}
		} catch (InterruptedException e) {
			System.err.println(e);
			// (Re-)Cancel if current thread also interrupted
			service.shutdownNow();
			// Preserve interrupt status
			Thread.currentThread().interrupt();
		}
		System.out.println("shutdown");
	}

	void println(String message) {
		System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
	}

}

constructeur

Il n'y a pas de limite de capacité.

private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();

Insérer un élément dans la file d'attente

Il utilise une méthode de transfert qui bloque l'élément indéfiniment jusqu'à ce qu'il soit reçu par le consommateur. Si une interruption se produit lors du blocage de la méthode de transfert, une InterruptedException se produit.

try {
	Integer e = Integer.valueOf(counter.incrementAndGet());
	queue.transfer(e);
	doneSignal.countDown();
} catch (InterruptedException e) {
	e.printStackTrace();
}

Supprimer des éléments de la file d'attente

Il utilise la méthode poll, qui attend 1 seconde jusqu'à ce qu'il puisse récupérer un élément de la file d'attente. Je l'ai commenté, mais si j'utilise la méthode take, il se bloquera indéfiniment jusqu'à ce qu'il puisse être récupéré.

try {
	// Integer e = queue.take();
	Integer e = queue.poll(1, TimeUnit.SECONDS);
	if (e != null) {
		// do something
	}
} catch (InterruptedException e) {
	e.printStackTrace();
}

Recommended Posts

Notes de révision des files d'attente / BlockingQueue / TransferQueue
Note de révision Enum
Une note de revue sur l'interface fonctionnelle
Notes de révision de Java NIO 2
Notes de révision de Java Collections Framework