In der Java-Parallelverarbeitungsbibliothek java.util.concurrent gibt es eine Klasse namens CyclicBarrier. CyclicBarrier ist nützlich für die Synchronisierung, wenn mehrere Threads parallel geschaltet sind. Lassen Sie uns tatsächlich mit dieser CyclicBarrier-Klasse parallel verarbeiten.
CyclicBarrier verfügt über mehrere Threads als bestimmten Punkt, wie im [offiziellen Dokument] beschrieben (https://docs.oracle.com/javase/jp/9/docs/api/java/util/concurrent/CyclicBarrier.html). Es ist eine ** Synchronisationsunterstützungsfunktion **, mit der Sie warten können, bis sie erreicht ist. Ein Wartepunkt, an dem mehrere Threads warten, wird als ** Barriere ** bezeichnet. Wenn alle Threads die Barriere erreichen, können Sie einen Prozess namens ** Barriereaktion ** ausführen.
Dieser Mechanismus kann von einer einzelnen CyclicBarrier-Instanz mehrmals ausgeführt werden.
Es heißt Cyclic, weil es wiederholt ausgeführt werden kann.
python
CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
python
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
WorkerThread-Klasse
@Override
public void run(){
// before process...
barrier.await();
// after process...
}
BarrierAction-Klasse
@Override
public void run(){
// execute command...
}
Erstellen Sie eine Dateisicherungsfunktion.
--Verkaufsformular --Inventarform --Coupon-Formular
Führen Sie die folgende Verarbeitung für die Form des Zielzeitraums aus.
Die Rolle der zyklischen Barriere in dieser Funktion ist:
Funktion | Die Rolle von CyclicBarrier |
---|---|
Kopieren Sie die Dateien in den WORK-Ordner | Thread-Synchronisation |
Komprimieren von Dateien im WORK-Ordner | Ausführung der gemeinsamen Verarbeitung |
Die Struktur des Ordners ist wie folgt.
C:
└─test
├─archive (Speicherordner für komprimierte Formulare)
│
├─config
│ config.properties
│
├─dailyReport
│ ├coupon ← Formular "Gutschein" Ordner
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├ Verkäufe ← Formular "Verkauf" Ordner
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └ Lager ← Formular "Lager" Ordner
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work (Temporäres Speicherziel für Formulardateien)
Erstellt eine Instanz der zyklischen Barriere und leitet den Prozess ein.
Managementklasse
package sample;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Controller {
public static void main(String[] args){
LocalDate startDate = LocalDate.parse(args[0]);
LocalDate endDate = LocalDate.parse(args[1]);
if(startDate.isAfter(endDate)){
throw new IllegalArgumentException(MessageFormat.format("Das Startdatum liegt nach dem Enddatum. Anfangsdatum:{0}Endtermin:{1}",startDate,endDate));
}
//Warteschlange, die den Sicherungszielzeitraum enthält
ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);
//Zu verarbeitendes Formular SET
ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();
/*
*Erstellen Sie eine Instanz der zyklischen Barriere.
*- Geben Sie die Anzahl der Formulartypen für die Anzahl der Arbeitsthreads an, auf die gewartet werden soll.
*- Geben Sie eine Barriereaktion als Argument an, um die allgemeine Verarbeitung auszuführen.
*/
CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));
//Verwenden Sie Executor, um den Worker-Thread zu starten.
ExecutorService service = Executors.newCachedThreadPool();
for(Report report: Report.values()){
service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
}
service.shutdown();
}
}
Der Arbeitsthread erstellt eine Dateikopie.
Arbeitsthread
package sample;
import static java.lang.System.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
/**
*Arbeitsthread beim Kopieren von Dateien
* @author nakam
*
*/
public class CopyFile implements Runnable {
/**Verantwortliches Formular*/
private Report report;
/**Zielzeitraum*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Formulardatei im Ordner WORK*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
/**Mit CyclicBarrier synchronisieren*/
private CyclicBarrier barrier;
public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
this.report = report;
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
this.barrier = barrier;
}
@Override
public void run(){
FilePath filePath = new FilePath();
while(!dateQueue.isEmpty()){
try {
Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
Path dst = filePath.getWorkDirPath().resolve(src.getFileName());
//Kopieren Sie die Formulardatei in den Ordner WORK
Files.copy(src, dst);
out.println(MessageFormat.format("Ich habe die Datei kopiert. Original:{0}Kopieren nach:{1}",src,dst));
reportPathSetInWorkDir.add(dst);
//Warten Sie mit einer zyklischen Barriere
barrier.await();
} catch (IOException | InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Barriereaktionen komprimieren und speichern Dateien.
Barriereaktion
package sample;
import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*Führen Sie die Komprimierungsverarbeitung als Barriereaktion durch.
* @author nakam
*
*/
public class BarrierAction implements Runnable {
/**Zielzeitraum*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Formulardatei im Ordner WORK*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
}
@Override
public void run() {
if(!dateQueue.isEmpty()){
//Komprimieren Sie die Formulardatei im Ordner WORK und speichern Sie sie im Archivordner
Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
reportPathSetInWorkDir.clear();
}
}
}
Darüber hinaus werden die folgenden Klassen verwendet, sie werden jedoch weggelassen, da sie nichts mit CyclicBarrier zu tun haben. Bei Interesse lesen Sie bitte den Quellcode auf Github.
Name der Klasse | Inhalte verarbeiten |
---|---|
Compress | Dateikomprimierungsprozess |
ConfigKey | Halten Sie den Schlüssel der Konfigurationsdatei gedrückt |
ConfigUtil | Laden Sie die Konfigurationsdatei |
ExternalCommand | Führen Sie externe Befehle von Java aus |
FilePath | Generieren Sie einen Dateipfad |
Report | Halten Sie die Art des Formulars |
Util | Nützlichkeit |
Führen Sie mit den folgenden Argumenten aus. Der Sicherungszielzeitraum wird als (2017/10/01 --2017 / 10/03) angegeben.
Java-Argument
2017-10-01 2017-10-03
Konsolenausgabe
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171001.CSV-Kopierziel: C.:\test\work\sales_20171001.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171001.CSV-Kopierziel: C.:\test\work\coupon_20171001.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171001.CSV-Kopierziel: C.:\test\work\stock_20171001.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171001.7z
Items to compress: 3
Files read from disk: 3
Archive size: 238 bytes (1 KiB)
Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "wurde ausgeführt. Code beenden:0
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171002.CSV-Kopierziel: C.:\test\work\sales_20171002.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171002.CSV-Kopierziel: C.:\test\work\stock_20171002.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171002.CSV-Kopierziel: C.:\test\work\coupon_20171002.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171002.7z
Items to compress: 3
Files read from disk: 3
Archive size: 240 bytes (1 KiB)
Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "wurde ausgeführt. Code beenden:0
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171003.CSV-Kopierziel: C.:\test\work\sales_20171003.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171003.CSV-Kopierziel: C.:\test\work\stock_20171003.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171003.CSV-Kopierziel: C.:\test\work\coupon_20171003.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171003.7z
Items to compress: 3
Files read from disk: 3
Archive size: 239 bytes (1 KiB)
Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "wurde ausgeführt. Code beenden:0
Sie können sehen, dass die Barriereaktion ausgeführt wird, nachdem der Arbeitsthread die Datei kopiert hat. Der Prozess wird dreimal ausgeführt. Da jedoch nur die Dateikopie parallel ausgeführt wird, kann die Reihenfolge des Kopierabschlusses unterschiedlich sein. Die andere Verarbeitung wird von CyclicBarrier synchronisiert und nacheinander ausgeführt.
C:
└─test
├─archive
│ archive_20171001.7z
│ archive_20171002.7z
│ archive_20171003.7z
│
├─config
│ config.properties
│
├─dailyReport
│ ├─coupon
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─sales
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work
CyclicBarrier ist eine Synchronisationsunterstützungsfunktion, sodass Sie denselben Prozess ohne Verwendung von CyclicBarrier implementieren können. In dem diesmal implementierten Level-Programm funktioniert es problemlos, auch wenn Sie Thread.join zum Synchronisieren der Worker-Threads verwenden. In diesem Fall ist die Verwaltungsklasse jedoch für die zentrale Verwaltung zuständig und erstellt einen Arbeitsthread, der jeweils nur eine Datei kopiert. Mit CyclicBarrier können Sie denselben Thread die Arbeit erledigen lassen, ohne jedes Mal einen Thread zu erstellen. CyclicBarrier ist für die Synchronisation zwischen Threads verantwortlich, sodass die Verwaltungsklasse den Prozess nach dem Erstellen des Worker-Threads beenden kann. Barriereaktionen sind dafür verantwortlich, gemeinsam genutzte Ressourcen zu aktualisieren oder eine gemeinsame Verarbeitung durchzuführen, wenn sich Threads treffen.
■ Barriereaktion im Beispielprogramm
Rolle | Inhalte verarbeiten |
---|---|
Aktualisieren Sie freigegebene Ressourcen | Stellen Sie das Verarbeitungszieldatum vor |
Ausführung der gemeinsamen Verarbeitung | Komprimiertes Speichern der Formulardatei |
Selbst wenn Sie keine Verwaltungsklasse haben, wäre es interessant, CyclicBrrier als Vermittler zu verwenden, um die Arbeitsthreads und Barriereaktionen synchron zu halten.
Java SE 8 & JDK 9 Class CyclicBarrier Technischer Blog 51. Synchronizer CyclicBarrier