Il existe une classe appelée CyclicBarrier dans la bibliothèque de traitement parallèle Java java.util.concurrent. CyclicBarrier est utile pour la synchronisation lorsque plusieurs threads sont mis en parallèle. Faisons en fait un traitement parallèle en utilisant cette classe CyclicBarrier.
CyclicBarrier a plusieurs threads à un moment précis, comme décrit dans le Document officiel. Il s'agit d'une ** fonction de support de synchronisation ** qui vous permet d'attendre jusqu'à ce qu'elle atteigne. Un point d'attente où plusieurs threads attendent est appelé une ** barrière **. Lorsque tous les threads atteignent la barrière, vous pouvez exécuter un processus appelé ** action de barrière **.
Ce mécanisme peut être exécuté plusieurs fois par une seule instance de CyclicBarrier.
Il est nommé Cyclic car il peut être exécuté à plusieurs reprises.
--Nombre de threads à synchroniser
python
CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
python
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
Classe WorkerThread
@Override
public void run(){
// before process...
barrier.await();
// after process...
}
Classe BarrierAction
@Override
public void run(){
// execute command...
}
Créez une fonction de sauvegarde de fichiers.
Exécutez le traitement suivant pour la forme de la période cible.
Le rôle de Cyclic Barrier dans cette fonctionnalité est:
une fonction | Le rôle de CyclicBarrier |
---|---|
Copier les fichiers dans le dossier WORK | Synchronisation des threads |
Compression de fichiers dans le dossier WORK | Exécution du traitement commun |
La structure du dossier est la suivante.
C:
└─test
├─archive (dossier de stockage des formulaires compressés)
│
├─config
│ config.properties
│
├─dailyReport
│ ├─coupon ← Dossier du formulaire «coupon»
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─ ventes ← Dossier "Ventes" du formulaire
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock ← Formulaire de dossier "stock"
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work (Destination de stockage temporaire pour les fichiers de formulaire)
Crée une instance de la barrière cyclique et lance le processus.
Classe de gestion
package sample;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Controller {
public static void main(String[] args){
LocalDate startDate = LocalDate.parse(args[0]);
LocalDate endDate = LocalDate.parse(args[1]);
if(startDate.isAfter(endDate)){
throw new IllegalArgumentException(MessageFormat.format("La date de début est postérieure à la date de fin. Date de début:{0}Date de fin:{1}",startDate,endDate));
}
//QUEUE qui contient la période cible de sauvegarde
ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);
//Formulaire à traiter SET
ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();
/*
*Créez une instance de la barrière cyclique.
*-Spécifiez le nombre de types de formulaires pour le nombre de threads de travail à attendre.
*-Spécifiez une action de barrière comme argument pour exécuter un traitement commun.
*/
CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));
//Utilisez Executor pour lancer le thread de travail.
ExecutorService service = Executors.newCachedThreadPool();
for(Report report: Report.values()){
service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
}
service.shutdown();
}
}
Le thread de travail effectue une copie de fichier.
Fil de travail
package sample;
import static java.lang.System.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
/**
*Fil de travail faisant la copie de fichiers
* @author nakam
*
*/
public class CopyFile implements Runnable {
/**Formulaire en charge*/
private Report report;
/**Période cible*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Fichier formulaire dans le dossier WORK*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
/**Synchroniser avec CyclicBarrier*/
private CyclicBarrier barrier;
public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
this.report = report;
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
this.barrier = barrier;
}
@Override
public void run(){
FilePath filePath = new FilePath();
while(!dateQueue.isEmpty()){
try {
Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
Path dst = filePath.getWorkDirPath().resolve(src.getFileName());
//Copiez le fichier de formulaire dans le dossier WORK
Files.copy(src, dst);
out.println(MessageFormat.format("J'ai copié le fichier. Original:{0}Copier:{1}",src,dst));
reportPathSetInWorkDir.add(dst);
//Attendez avec une barrière cyclique
barrier.await();
} catch (IOException | InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Les actions de barrière compressent et enregistrent les fichiers.
Action de barrière
package sample;
import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*Effectuez le traitement de compression comme une action de barrière.
* @author nakam
*
*/
public class BarrierAction implements Runnable {
/**Période cible*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Fichier formulaire dans le dossier WORK*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
}
@Override
public void run() {
if(!dateQueue.isEmpty()){
//Compressez le fichier de formulaire dans le dossier WORK et enregistrez-le dans le dossier d'archive
Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
reportPathSetInWorkDir.clear();
}
}
}
De plus, les classes suivantes sont utilisées, mais elles sont omises car elles n'ont rien à voir avec CyclicBarrier. Si vous êtes intéressé, veuillez vous référer au code source sur Github.
nom de la classe | Traitement du contenu |
---|---|
Compress | Processus de compression de fichiers |
ConfigKey | Maintenez la clé du fichier de configuration |
ConfigUtil | Chargez le fichier de configuration |
ExternalCommand | Exécuter des commandes externes à partir de Java |
FilePath | Générer un chemin de fichier |
Report | Tenez le type de formulaire |
Util | utilitaire |
Exécutez avec les arguments suivants. La période cible de sauvegarde est spécifiée comme (2017/10/01 --2017 / 10/03).
Argument Java
2017-10-01 2017-10-03
Sortie de la console
J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171001.destination de la copie csv: C:\test\work\sales_20171001.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171001.destination de la copie csv: C:\test\work\coupon_20171001.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171001.destination de la copie csv: C:\test\work\stock_20171001.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171001.7z
Items to compress: 3
Files read from disk: 3
Archive size: 238 bytes (1 KiB)
Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "a été exécuté. Code de sortie:0
J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171002.destination de la copie csv: C:\test\work\sales_20171002.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171002.destination de la copie csv: C:\test\work\stock_20171002.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171002.destination de la copie csv: C:\test\work\coupon_20171002.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171002.7z
Items to compress: 3
Files read from disk: 3
Archive size: 240 bytes (1 KiB)
Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "a été exécuté. Code de sortie:0
J'ai copié le fichier. Copier la source: C:\test\dailyReport\sales\sales_20171003.destination de la copie csv: C:\test\work\sales_20171003.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\stock\stock_20171003.destination de la copie csv: C:\test\work\stock_20171003.csv
J'ai copié le fichier. Copier la source: C:\test\dailyReport\coupon\coupon_20171003.destination de la copie csv: C:\test\work\coupon_20171003.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171003.7z
Items to compress: 3
Files read from disk: 3
Archive size: 239 bytes (1 KiB)
Everything is Ok
Commande externe "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "a été exécuté. Code de sortie:0
Vous pouvez voir que l'action de barrière est exécutée après que le thread de travail a copié le fichier. Le processus est exécuté trois fois, mais comme seule la copie de fichier est exécutée en parallèle, l'ordre de fin de copie peut être différent. Les autres traitements sont synchronisés par CyclicBarrier et sont exécutés séquentiellement.
C:
└─test
├─archive
│ archive_20171001.7z
│ archive_20171002.7z
│ archive_20171003.7z
│
├─config
│ config.properties
│
├─dailyReport
│ ├─coupon
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─sales
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work
CyclicBarrier est une fonction de support de synchronisation, vous pouvez donc implémenter le même processus sans utiliser CyclicBarrier. Dans le programme de niveau implémenté cette fois, même si vous utilisez Thread.join pour synchroniser les threads de travail, cela fonctionnera sans problème. Cependant, dans ce cas, la classe de gestion est en charge de la gestion centralisée et crée un thread de travail qui ne copie qu'un seul fichier à chaque fois. Avec CyclicBarrier, vous pouvez laisser le même thread faire le travail sans créer de thread à chaque fois. CyclicBarrier est en charge de la synchronisation entre les threads, afin que la classe de gestion puisse terminer le processus après avoir créé le thread de travail. Les actions de barrière sont responsables de la mise à jour des ressources partagées ou de l'exécution d'un traitement commun lorsque les threads se rencontrent.
■ Action de barrière dans le programme exemple
rôle | Traitement du contenu |
---|---|
Mettre à jour les ressources partagées | Avancer la date cible de traitement |
Exécution du traitement partagé | Sauvegarde compressée du fichier de formulaire |
Même si vous n'avez pas de classe de gestion, il serait intéressant d'utiliser CyclicBrrier comme intermédiaire pour synchroniser les threads de travail et les actions de barrière.
Java SE 8 & JDK 9 Class CyclicBarrier Blog technique 51e Synchronizer CyclicBarrier
Recommended Posts