Effectuer un traitement parallèle à l'aide de CyclicBarrier de Java

Aperçu

Il existe une classe appelée CyclicBarrier dans la bibliothèque de traitement parallèle Java java.util.concurrent. CyclicBarrier est utile pour la synchronisation lorsque plusieurs threads sont mis en parallèle. Faisons en fait un traitement parallèle en utilisant cette classe CyclicBarrier.

Comment fonctionne CyclicBarrier

CyclicBarrier a plusieurs threads à un moment précis, comme décrit dans le Document officiel. Il s'agit d'une ** fonction de support de synchronisation ** qui vous permet d'attendre jusqu'à ce qu'elle atteigne. Un point d'attente où plusieurs threads attendent est appelé une ** barrière **. Lorsque tous les threads atteignent la barrière, vous pouvez exécuter un processus appelé ** action de barrière **. 1.JPG

Ce mécanisme peut être exécuté plusieurs fois par une seule instance de CyclicBarrier. 2.JPG

Il est nommé Cyclic car il peut être exécuté à plusieurs reprises.

Comment utiliser CyclicBarrier

  1. Créez une instance de la classe CyclicBarrier. À ce stade, spécifiez ce qui suit dans l'argument du constructeur.

--Nombre de threads à synchroniser

python


CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
  1. Exécutez le thread de travail. Donnez au thread de travail une référence à l'instance CyclicBarrier.

python


ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
  1. Le thread de travail appelle à tout moment la méthode await de la classe CyclicBarrier pour interrompre le processus.

Classe WorkerThread


	@Override
	public void run(){
		// before process...
		barrier.await();
		// after process...
	}
  1. Lorsque le nombre de threads de travail interrompus atteint le nombre de threads défini dans le constructeur, tous les threads de travail attendus sont suspendus. À ce stade, la classe CyclicBarrier exécute l'action de barrière.

Classe BarrierAction


	@Override
	public void run(){
		// execute command...
	}
  1. Une fois l'exécution de l'action de barrière terminée, tous les threads de travail suspendus reprendront le traitement.
  2. Répétez les étapes 3 à 5.

Exemple de programme

Aperçu

Créez une fonction de sauvegarde de fichiers.

■ Cible de sauvegarde

■ Exigences fonctionnelles

conception

Exécutez le traitement suivant pour la forme de la période cible.

  1. Copiez le fichier de formulaire avec la même date dans le dossier de travail.
  2. Compressez les fichiers de formulaire de la même date collectés dans le dossier de travail et enregistrez-les dans le dossier d'archive.

image.png

■ Rôle de la barrière cyclique

Le rôle de Cyclic Barrier dans cette fonctionnalité est:

une fonction Le rôle de CyclicBarrier
Copier les fichiers dans le dossier WORK Synchronisation des threads
Compression de fichiers dans le dossier WORK Exécution du traitement commun

3.JPG

■ Structure des dossiers

La structure du dossier est la suivante.

C:
└─test
├─archive (dossier de stockage des formulaires compressés)
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
│ ├─coupon ← Dossier du formulaire «coupon»
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
│ ├─ ventes ← Dossier "Ventes" du formulaire
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
│ └─stock ← Formulaire de dossier "stock"
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work (Destination de stockage temporaire pour les fichiers de formulaire)

la mise en oeuvre

■ Classe de gestion

Crée une instance de la barrière cyclique et lance le processus.

Classe de gestion


package sample;

import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Controller {

	public static void main(String[] args){

		LocalDate startDate = LocalDate.parse(args[0]);
		LocalDate endDate = LocalDate.parse(args[1]);

		if(startDate.isAfter(endDate)){
			throw new IllegalArgumentException(MessageFormat.format("La date de début est postérieure à la date de fin. Date de début:{0}Date de fin:{1}",startDate,endDate));
		}

		//QUEUE qui contient la période cible de sauvegarde
		ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);

		//Formulaire à traiter SET
		ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();

		/*
		 *Créez une instance de la barrière cyclique.
		 *-Spécifiez le nombre de types de formulaires pour le nombre de threads de travail à attendre.
		 *-Spécifiez une action de barrière comme argument pour exécuter un traitement commun.
		 */
		CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));

		//Utilisez Executor pour lancer le thread de travail.
		ExecutorService service = Executors.newCachedThreadPool();

		for(Report report: Report.values()){
			service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
		}
		service.shutdown();
	}
}

■ Fil de travail

Le thread de travail effectue une copie de fichier.

Fil de travail


package sample;

import static java.lang.System.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;

/**
 *Fil de travail faisant la copie de fichiers
 * @author nakam
 *
 */
public class CopyFile implements Runnable {

	/**Formulaire en charge*/
	private Report report;

	/**Période cible*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Fichier formulaire dans le dossier WORK*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	/**Synchroniser avec CyclicBarrier*/
	private CyclicBarrier barrier;

	public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
		this.report = report;
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
		this.barrier = barrier;
	}

	@Override
	public void run(){

		FilePath filePath = new FilePath();

		while(!dateQueue.isEmpty()){

			try {
				Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
				Path dst = filePath.getWorkDirPath().resolve(src.getFileName());

				//Copiez le fichier de formulaire dans le dossier WORK
				Files.copy(src, dst);
				out.println(MessageFormat.format("J'ai copié le fichier. Original:{0}Copier:{1}",src,dst));
				reportPathSetInWorkDir.add(dst);

				//Attendez avec une barrière cyclique
				barrier.await();
			} catch (IOException | InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}

■ Action de barrière

Les actions de barrière compressent et enregistrent les fichiers.

Action de barrière


package sample;

import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;

/**
 *Effectuez le traitement de compression comme une action de barrière.
 * @author nakam
 *
 */
public class BarrierAction implements Runnable {

	/**Période cible*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Fichier formulaire dans le dossier WORK*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
	}

	@Override
	public void run() {

		if(!dateQueue.isEmpty()){

			//Compressez le fichier de formulaire dans le dossier WORK et enregistrez-le dans le dossier d'archive
			Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
			reportPathSetInWorkDir.clear();
		}
	}
}

■ Autres classes

De plus, les classes suivantes sont utilisées, mais elles sont omises car elles n'ont rien à voir avec CyclicBarrier. Si vous êtes intéressé, veuillez vous référer au code source sur Github.

nom de la classe Traitement du contenu
Compress Processus de compression de fichiers
ConfigKey Maintenez la clé du fichier de configuration
ConfigUtil Chargez le fichier de configuration
ExternalCommand Exécuter des commandes externes à partir de Java
FilePath Générer un chemin de fichier
Report Tenez le type de formulaire
Util utilitaire

tester

Courir

Exécutez avec les arguments suivants. La période cible de sauvegarde est spécifiée comme (2017/10/01 --2017 / 10/03).

Argument Java


2017-10-01 2017-10-03

Résultat d'exécution

Sortie de la console


J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171001.destination de la copie csv: C:\test\work\sales_20171001.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171001.destination de la copie csv: C:\test\work\coupon_20171001.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171001.destination de la copie csv: C:\test\work\stock_20171001.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171001.7z

Items to compress: 3


Files read from disk: 3
Archive size: 238 bytes (1 KiB)

Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "a été exécuté. Code de sortie:0
J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171002.destination de la copie csv: C:\test\work\sales_20171002.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171002.destination de la copie csv: C:\test\work\stock_20171002.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171002.destination de la copie csv: C:\test\work\coupon_20171002.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171002.7z

Items to compress: 3


Files read from disk: 3
Archive size: 240 bytes (1 KiB)

Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "a été exécuté. Code de sortie:0
J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171003.destination de la copie csv: C:\test\work\sales_20171003.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171003.destination de la copie csv: C:\test\work\stock_20171003.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171003.destination de la copie csv: C:\test\work\coupon_20171003.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171003.7z

Items to compress: 3


Files read from disk: 3
Archive size: 239 bytes (1 KiB)

Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "a été exécuté. Code de sortie:0

Vous pouvez voir que l'action de barrière est exécutée après que le thread de travail a copié le fichier. Le processus est exécuté trois fois, mais comme seule la copie de fichier est exécutée en parallèle, l'ordre de fin de copie peut être différent. Les autres traitements sont synchronisés par CyclicBarrier et sont exécutés séquentiellement.

Dossier après l'exécution du test

C:
└─test
   ├─archive
   │      archive_20171001.7z
   │      archive_20171002.7z
   │      archive_20171003.7z
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
   │  ├─coupon
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
   │  ├─sales
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
   │  └─stock
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work

Considération

CyclicBarrier est une fonction de support de synchronisation, vous pouvez donc implémenter le même processus sans utiliser CyclicBarrier. Dans le programme de niveau implémenté cette fois, même si vous utilisez Thread.join pour synchroniser les threads de travail, cela fonctionnera sans problème. Cependant, dans ce cas, la classe de gestion est en charge de la gestion centralisée et crée un thread de travail qui ne copie qu'un seul fichier à chaque fois. Avec CyclicBarrier, vous pouvez laisser le même thread faire le travail sans créer de thread à chaque fois. CyclicBarrier est en charge de la synchronisation entre les threads, afin que la classe de gestion puisse terminer le processus après avoir créé le thread de travail. Les actions de barrière sont responsables de la mise à jour des ressources partagées ou de l'exécution d'un traitement commun lorsque les threads se rencontrent.

■ Action de barrière dans le programme exemple

rôle Traitement du contenu
Mettre à jour les ressources partagées Avancer la date cible de traitement
Exécution du traitement partagé Sauvegarde compressée du fichier de formulaire

Impressions

Même si vous n'avez pas de classe de gestion, il serait intéressant d'utiliser CyclicBrrier comme intermédiaire pour synchroniser les threads de travail et les actions de barrière.

Emplacement de stockage de l'exemple de programme

Github

référence

Java SE 8 & JDK 9 Class CyclicBarrier Blog technique 51e Synchronizer CyclicBarrier

Environnement d'exécution

Recommended Posts

Effectuer un traitement parallèle à l'aide de CyclicBarrier de Java
Traitement des données avec Apache Flink
Traitement parallèle mesuré avec Java
[Swift] Traitement asynchrone à l'aide de PromiseKit
[Traitement] Essayez d'utiliser GT Force.
Traitement de la sortie CSV avec Super-CSV