There is a class called CyclicBarrier in Java's parallel processing library java.util.concurrent. CyclicBarrier is useful for synchronizing when multiple threads are parallelized. Let's actually try parallel processing using this CyclicBarrier class.
CyclicBarrier has multiple threads at a specific point, as described in the Official Document. It is a ** synchronization support function ** that allows you to wait until it reaches. A wait point where multiple threads wait is called a ** barrier **. When all threads reach the barrier, you can perform a process called ** barrier action **.
This mechanism can be run multiple times by a single CyclicBarrier instance.
It is named Cyclic because it can be run repeatedly.
--Number of threads to synchronize --An instance of a class that implements a barrier action
python
CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
python
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
WorkerThread class
@Override
public void run(){
// before process...
barrier.await();
// after process...
}
BarrierAction class
@Override
public void run(){
// execute command...
}
Create a file backup function.
--Sales form --Inventory form --Coupon form
--Allows you to specify the backup target period. --The form is compressed and saved in one archive file in units of dates.
Execute the following processing for the form of the target period.
The role of Cyclic Barrier in this feature is:
function | The role of CyclicBarrier |
---|---|
Copy files to WORK folder | Thread synchronization |
WORK folder file compression | Execution of common processing |
The structure of the folder is as follows.
C:
└─test
├─archive (compressed form storage folder)
│
├─config
│ config.properties
│
├─dailyReport
│ ├─coupon ← Form "Coupon" folder
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─ sales ← Form "Sales" folder
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock ← Form "inventory" folder
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work (Temporary storage destination for form files)
Instantiate a cyclic barrier and kick the process.
Management class
package sample;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Controller {
public static void main(String[] args){
LocalDate startDate = LocalDate.parse(args[0]);
LocalDate endDate = LocalDate.parse(args[1]);
if(startDate.isAfter(endDate)){
throw new IllegalArgumentException(MessageFormat.format("The start date is after the end date. Start date:{0}End date:{1}",startDate,endDate));
}
//QUEUE that holds the backup target period
ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);
//Form to be processed SET
ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();
/*
*Create an instance of a cyclic barrier.
*-Specify the number of form types for the number of worker threads to be waited for.
*-Specify a barrier action as an argument to execute common processing.
*/
CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));
//Use the Executor to kick the worker thread.
ExecutorService service = Executors.newCachedThreadPool();
for(Report report: Report.values()){
service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
}
service.shutdown();
}
}
The worker thread makes a file copy.
Worker thread
package sample;
import static java.lang.System.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
/**
*Worker thread doing file copy
* @author nakam
*
*/
public class CopyFile implements Runnable {
/**Form in charge*/
private Report report;
/**Target period*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Form file in the WORK folder*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
/**Synchronize with CyclicBarrier*/
private CyclicBarrier barrier;
public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
this.report = report;
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
this.barrier = barrier;
}
@Override
public void run(){
FilePath filePath = new FilePath();
while(!dateQueue.isEmpty()){
try {
Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
Path dst = filePath.getWorkDirPath().resolve(src.getFileName());
//Copy the form file to the WORK folder
Files.copy(src, dst);
out.println(MessageFormat.format("I copied the file. Original:{0}Copy to:{1}",src,dst));
reportPathSetInWorkDir.add(dst);
//Wait with a cyclic barrier
barrier.await();
} catch (IOException | InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Barrier actions compress and save files.
Barrier action
package sample;
import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*Perform compression processing as a barrier action.
* @author nakam
*
*/
public class BarrierAction implements Runnable {
/**Target period*/
private ConcurrentLinkedQueue<LocalDate> dateQueue;
/**Form file in the WORK folder*/
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;
public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
}
@Override
public void run() {
if(!dateQueue.isEmpty()){
//Compress the form file in the WORK folder and save it in the archive folder
Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
reportPathSetInWorkDir.clear();
}
}
}
In addition, the following classes are used, but they are omitted because they have nothing to do with CyclicBarrier. If you are interested, please refer to the source code on Github.
name of the class | Processing content |
---|---|
Compress | File compression process |
ConfigKey | Hold the key of the configuration file |
ConfigUtil | Load the config file |
ExternalCommand | Execute external commands from Java |
FilePath | Generate a file path |
Report | Hold the type of form |
Util | utility |
Execute with the following arguments. The backup target period is specified as (2017/10/01 --2017/10/03).
Java argument
2017-10-01 2017-10-03
Console output
I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171001.csv copy destination: C:\test\work\sales_20171001.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171001.csv copy destination: C:\test\work\coupon_20171001.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171001.csv copy destination: C:\test\work\stock_20171001.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171001.7z
Items to compress: 3
Files read from disk: 3
Archive size: 238 bytes (1 KiB)
Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "was executed. Exit code:0
I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171002.csv copy destination: C:\test\work\sales_20171002.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171002.csv copy destination: C:\test\work\stock_20171002.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171002.csv copy destination: C:\test\work\coupon_20171002.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171002.7z
Items to compress: 3
Files read from disk: 3
Archive size: 240 bytes (1 KiB)
Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "was executed. Exit code:0
I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171003.csv copy destination: C:\test\work\sales_20171003.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171003.csv copy destination: C:\test\work\stock_20171003.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171003.csv copy destination: C:\test\work\coupon_20171003.csv
7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04
Scanning the drive:
3 files, 43 bytes (1 KiB)
Creating archive: C:\test\archive\archive_20171003.7z
Items to compress: 3
Files read from disk: 3
Archive size: 239 bytes (1 KiB)
Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "was executed. Exit code:0
You can see that the barrier action is being performed after the worker thread has copied the file. The process is executed three times, but since only the file copy is executed in parallel, the copy finish order may be different. Other processing is synchronized by CyclicBarrier and is executed sequentially.
C:
└─test
├─archive
│ archive_20171001.7z
│ archive_20171002.7z
│ archive_20171003.7z
│
├─config
│ config.properties
│
├─dailyReport
│ ├─coupon
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─sales
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv
│
└─work
CyclicBarrier is a synchronization support function, so you can implement the same process without using CyclicBarrier. In the level program implemented this time, even if you use Thread.join to synchronize the worker threads, it will work without problems. However, in that case, the management class will be in charge of centralized management, and will generate a worker thread that only copies one file each time. With CyclicBarrier, you can let the same thread do the work without creating a thread every time. CyclicBarrier is in charge of synchronization between threads, so the management class can finish the process after the worker thread is created. Barrier actions are responsible for updating shared resources or performing common processing when threads meet.
■ Barrier action in sample program
role | Processing content |
---|---|
Update shared resources | Advance the processing target date |
Execution of shared processing | Compressed save of form file |
Even if you don't have a management class, it would be interesting to use CyclicBrrier as an intermediary to keep the worker threads and barrier actions in sync.
Java SE 8 & JDK 9 Class CyclicBarrier Technical Blog 51st Synchronizer CyclicBarrier