Führen Sie die Parallelverarbeitung mit dem CyclicBarrier von Java durch

Überblick

In der Java-Parallelverarbeitungsbibliothek java.util.concurrent gibt es eine Klasse namens CyclicBarrier. CyclicBarrier ist nützlich für die Synchronisierung, wenn mehrere Threads parallel geschaltet sind. Lassen Sie uns tatsächlich mit dieser CyclicBarrier-Klasse parallel verarbeiten.

Wie CyclicBarrier funktioniert

CyclicBarrier verfügt über mehrere Threads als bestimmten Punkt, wie im [offiziellen Dokument] beschrieben (https://docs.oracle.com/javase/jp/9/docs/api/java/util/concurrent/CyclicBarrier.html). Es ist eine ** Synchronisationsunterstützungsfunktion **, mit der Sie warten können, bis sie erreicht ist. Ein Wartepunkt, an dem mehrere Threads warten, wird als ** Barriere ** bezeichnet. Wenn alle Threads die Barriere erreichen, können Sie einen Prozess namens ** Barriereaktion ** ausführen. 1.JPG

Dieser Mechanismus kann von einer einzelnen CyclicBarrier-Instanz mehrmals ausgeführt werden. 2.JPG

Es heißt Cyclic, weil es wiederholt ausgeführt werden kann.

Verwendung von CyclicBarrier

  1. Erstellen Sie eine Instanz der CyclicBarrier-Klasse. Geben Sie zu diesem Zeitpunkt im Konstruktorargument Folgendes an.

python


CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
  1. Führen Sie den Arbeitsthread aus. Geben Sie dem Worker-Thread einen Verweis auf die CyclicBarrier-Instanz.

python


ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
  1. Der Worker-Thread ruft jederzeit die wait-Methode der CyclicBarrier-Klasse auf, um den Prozess zu unterbrechen.

WorkerThread-Klasse


	@Override
	public void run(){
		// before process...
		barrier.await();
		// after process...
	}
  1. Wenn die Anzahl der unterbrochenen Arbeitsthreads die im Konstruktor festgelegte Anzahl von Threads erreicht, werden alle erwarteten Arbeitsthreads angehalten. Zu diesem Zeitpunkt führt die CyclicBarrier-Klasse die Barriereaktion aus.

BarrierAction-Klasse


	@Override
	public void run(){
		// execute command...
	}
  1. Wenn die Ausführung der Barriereaktion abgeschlossen ist, setzen alle angehaltenen Worker-Threads die Verarbeitung fort.
  2. Wiederholen Sie die Schritte 3-5.

Beispielprogramm

Überblick

Erstellen Sie eine Dateisicherungsfunktion.

■ Sicherungsziel

--Verkaufsformular --Inventarform --Coupon-Formular

■ Funktionsanforderungen

Design

Führen Sie die folgende Verarbeitung für die Form des Zielzeitraums aus.

  1. Kopieren Sie die Formulardatei mit demselben Datum in den Arbeitsordner.
  2. Komprimieren Sie die im Arbeitsordner gesammelten Formulardateien desselben Datums und speichern Sie sie im Archivordner.

image.png

■ Rolle der zyklischen Barriere

Die Rolle der zyklischen Barriere in dieser Funktion ist:

Funktion Die Rolle von CyclicBarrier
Kopieren Sie die Dateien in den WORK-Ordner Thread-Synchronisation
Komprimieren von Dateien im WORK-Ordner Ausführung der gemeinsamen Verarbeitung

3.JPG

■ Ordnerstruktur

Die Struktur des Ordners ist wie folgt.

C:
└─test
├─archive (Speicherordner für komprimierte Formulare)
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
│ ├coupon ← Formular "Gutschein" Ordner
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
│ ├ Verkäufe ← Formular "Verkauf" Ordner
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
│ └ Lager ← Formular "Lager" Ordner
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work (Temporäres Speicherziel für Formulardateien)

Implementierung

■ Managementklasse

Erstellt eine Instanz der zyklischen Barriere und leitet den Prozess ein.

Managementklasse


package sample;

import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Controller {

	public static void main(String[] args){

		LocalDate startDate = LocalDate.parse(args[0]);
		LocalDate endDate = LocalDate.parse(args[1]);

		if(startDate.isAfter(endDate)){
			throw new IllegalArgumentException(MessageFormat.format("Das Startdatum liegt nach dem Enddatum. Anfangsdatum:{0}Endtermin:{1}",startDate,endDate));
		}

		//Warteschlange, die den Sicherungszielzeitraum enthält
		ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);

		//Zu verarbeitendes Formular SET
		ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();

		/*
		 *Erstellen Sie eine Instanz der zyklischen Barriere.
		 *- Geben Sie die Anzahl der Formulartypen für die Anzahl der Arbeitsthreads an, auf die gewartet werden soll.
		 *- Geben Sie eine Barriereaktion als Argument an, um die allgemeine Verarbeitung auszuführen.
		 */
		CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));

		//Verwenden Sie Executor, um den Worker-Thread zu starten.
		ExecutorService service = Executors.newCachedThreadPool();

		for(Report report: Report.values()){
			service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
		}
		service.shutdown();
	}
}

■ Arbeitsthread

Der Arbeitsthread erstellt eine Dateikopie.

Arbeitsthread


package sample;

import static java.lang.System.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;

/**
 *Arbeitsthread beim Kopieren von Dateien
 * @author nakam
 *
 */
public class CopyFile implements Runnable {

	/**Verantwortliches Formular*/
	private Report report;

	/**Zielzeitraum*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Formulardatei im Ordner WORK*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	/**Mit CyclicBarrier synchronisieren*/
	private CyclicBarrier barrier;

	public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
		this.report = report;
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
		this.barrier = barrier;
	}

	@Override
	public void run(){

		FilePath filePath = new FilePath();

		while(!dateQueue.isEmpty()){

			try {
				Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
				Path dst = filePath.getWorkDirPath().resolve(src.getFileName());

				//Kopieren Sie die Formulardatei in den Ordner WORK
				Files.copy(src, dst);
				out.println(MessageFormat.format("Ich habe die Datei kopiert. Original:{0}Kopieren nach:{1}",src,dst));
				reportPathSetInWorkDir.add(dst);

				//Warten Sie mit einer zyklischen Barriere
				barrier.await();
			} catch (IOException | InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}

■ Barriereaktion

Barriereaktionen komprimieren und speichern Dateien.

Barriereaktion


package sample;

import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;

/**
 *Führen Sie die Komprimierungsverarbeitung als Barriereaktion durch.
 * @author nakam
 *
 */
public class BarrierAction implements Runnable {

	/**Zielzeitraum*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Formulardatei im Ordner WORK*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
	}

	@Override
	public void run() {

		if(!dateQueue.isEmpty()){

			//Komprimieren Sie die Formulardatei im Ordner WORK und speichern Sie sie im Archivordner
			Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
			reportPathSetInWorkDir.clear();
		}
	}
}

■ Andere Klassen

Darüber hinaus werden die folgenden Klassen verwendet, sie werden jedoch weggelassen, da sie nichts mit CyclicBarrier zu tun haben. Bei Interesse lesen Sie bitte den Quellcode auf Github.

Name der Klasse Inhalte verarbeiten
Compress Dateikomprimierungsprozess
ConfigKey Halten Sie den Schlüssel der Konfigurationsdatei gedrückt
ConfigUtil Laden Sie die Konfigurationsdatei
ExternalCommand Führen Sie externe Befehle von Java aus
FilePath Generieren Sie einen Dateipfad
Report Halten Sie die Art des Formulars
Util Nützlichkeit

Prüfung

Lauf

Führen Sie mit den folgenden Argumenten aus. Der Sicherungszielzeitraum wird als (2017/10/01 --2017 / 10/03) angegeben.

Java-Argument


2017-10-01 2017-10-03

Ausführungsergebnis

Konsolenausgabe


Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171001.CSV-Kopierziel: C.:\test\work\sales_20171001.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171001.CSV-Kopierziel: C.:\test\work\coupon_20171001.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171001.CSV-Kopierziel: C.:\test\work\stock_20171001.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171001.7z

Items to compress: 3


Files read from disk: 3
Archive size: 238 bytes (1 KiB)

Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "wurde ausgeführt. Code beenden:0
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171002.CSV-Kopierziel: C.:\test\work\sales_20171002.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171002.CSV-Kopierziel: C.:\test\work\stock_20171002.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171002.CSV-Kopierziel: C.:\test\work\coupon_20171002.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171002.7z

Items to compress: 3


Files read from disk: 3
Archive size: 240 bytes (1 KiB)

Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "wurde ausgeführt. Code beenden:0
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\sales\sales_20171003.CSV-Kopierziel: C.:\test\work\sales_20171003.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\stock\stock_20171003.CSV-Kopierziel: C.:\test\work\stock_20171003.csv
Ich habe die Datei kopiert. Quelle kopieren: C.:\test\dailyReport\coupon\coupon_20171003.CSV-Kopierziel: C.:\test\work\coupon_20171003.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171003.7z

Items to compress: 3


Files read from disk: 3
Archive size: 239 bytes (1 KiB)

Everything is Ok
Externer Befehl "C.:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "wurde ausgeführt. Code beenden:0

Sie können sehen, dass die Barriereaktion ausgeführt wird, nachdem der Arbeitsthread die Datei kopiert hat. Der Prozess wird dreimal ausgeführt. Da jedoch nur die Dateikopie parallel ausgeführt wird, kann die Reihenfolge des Kopierabschlusses unterschiedlich sein. Die andere Verarbeitung wird von CyclicBarrier synchronisiert und nacheinander ausgeführt.

Ordner nach dem Ausführen des Tests

C:
└─test
   ├─archive
   │      archive_20171001.7z
   │      archive_20171002.7z
   │      archive_20171003.7z
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
   │  ├─coupon
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
   │  ├─sales
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
   │  └─stock
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work

Erwägung

CyclicBarrier ist eine Synchronisationsunterstützungsfunktion, sodass Sie denselben Prozess ohne Verwendung von CyclicBarrier implementieren können. In dem diesmal implementierten Level-Programm funktioniert es problemlos, auch wenn Sie Thread.join zum Synchronisieren der Worker-Threads verwenden. In diesem Fall ist die Verwaltungsklasse jedoch für die zentrale Verwaltung zuständig und erstellt einen Arbeitsthread, der jeweils nur eine Datei kopiert. Mit CyclicBarrier können Sie denselben Thread die Arbeit erledigen lassen, ohne jedes Mal einen Thread zu erstellen. CyclicBarrier ist für die Synchronisation zwischen Threads verantwortlich, sodass die Verwaltungsklasse den Prozess nach dem Erstellen des Worker-Threads beenden kann. Barriereaktionen sind dafür verantwortlich, gemeinsam genutzte Ressourcen zu aktualisieren oder eine gemeinsame Verarbeitung durchzuführen, wenn sich Threads treffen.

■ Barriereaktion im Beispielprogramm

Rolle Inhalte verarbeiten
Aktualisieren Sie freigegebene Ressourcen Stellen Sie das Verarbeitungszieldatum vor
Ausführung der gemeinsamen Verarbeitung Komprimiertes Speichern der Formulardatei

Impressionen

Selbst wenn Sie keine Verwaltungsklasse haben, wäre es interessant, CyclicBrrier als Vermittler zu verwenden, um die Arbeitsthreads und Barriereaktionen synchron zu halten.

Speicherort des Beispielprogramms

Github

Referenz

Java SE 8 & JDK 9 Class CyclicBarrier Technischer Blog 51. Synchronizer CyclicBarrier

Ausführungsumgebung

Recommended Posts

Führen Sie die Parallelverarbeitung mit dem CyclicBarrier von Java durch
Datenverarbeitung mit Apache Flink
Gemessene Parallelverarbeitung mit Java
[Swift] Asynchrone Verarbeitung mit PromiseKit
[Verarbeitung] Versuchen Sie es mit GT Force.
CSV-Ausgabeverarbeitung mit Super-CSV