Perform parallel processing using Java's CyclicBarrier

Overview

There is a class called CyclicBarrier in Java's parallel processing library java.util.concurrent. CyclicBarrier is useful for synchronizing when multiple threads are parallelized. Let's actually try parallel processing using this CyclicBarrier class.

How CyclicBarrier works

CyclicBarrier has multiple threads at a specific point, as described in the Official Document. It is a ** synchronization support function ** that allows you to wait until it reaches. A wait point where multiple threads wait is called a ** barrier **. When all threads reach the barrier, you can perform a process called ** barrier action **. 1.JPG

This mechanism can be run multiple times by a single CyclicBarrier instance. 2.JPG

It is named Cyclic because it can be run repeatedly.

How to use CyclicBarrier

  1. Create an instance of the CyclicBarrier class. At this time, specify the following in the constructor argument.

--Number of threads to synchronize --An instance of a class that implements a barrier action

python


CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());
  1. Execute the worker thread. Give the worker thread a reference to the CyclicBarrier instance.

python


ExecutorService service = Executors.newCachedThreadPool();
service.submit(new WorkerThread(barrier));
  1. The worker thread calls the await method of the CyclicBarrier class at any time to interrupt the process.

WorkerThread class


	@Override
	public void run(){
		// before process...
		barrier.await();
		// after process...
	}
  1. When the number of worker thread interruptions reaches the number of threads set in the constructor, all the expected worker threads are suspended. At this time, the CyclicBarrier class executes the barrier action.

BarrierAction class


	@Override
	public void run(){
		// execute command...
	}
  1. When the barrier action finishes executing, all suspended worker threads will resume processing.
  2. Repeat steps 3-5.

Sample program

Overview

Create a file backup function.

■ Backup target

--Sales form --Inventory form --Coupon form

■ Functional requirements

--Allows you to specify the backup target period. --The form is compressed and saved in one archive file in units of dates.

design

Execute the following processing for the form of the target period.

  1. Copy the form file with the same date to the work folder.
  2. Compress the form files of the same date collected in the work folder and save them in the archive folder.

image.png

■ Role of Cyclic Barrier

The role of Cyclic Barrier in this feature is:

function The role of CyclicBarrier
Copy files to WORK folder Thread synchronization
WORK folder file compression Execution of common processing

3.JPG

■ Folder structure

The structure of the folder is as follows.

C:
└─test
├─archive (compressed form storage folder)
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
│ ├─coupon ← Form "Coupon" folder
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
│ ├─ sales ← Form "Sales" folder
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
│ └─stock ← Form "inventory" folder
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work (Temporary storage destination for form files)

Implementation

■ Management class

Instantiate a cyclic barrier and kick the process.

Management class


package sample;

import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Controller {

	public static void main(String[] args){

		LocalDate startDate = LocalDate.parse(args[0]);
		LocalDate endDate = LocalDate.parse(args[1]);

		if(startDate.isAfter(endDate)){
			throw new IllegalArgumentException(MessageFormat.format("The start date is after the end date. Start date:{0}End date:{1}",startDate,endDate));
		}

		//QUEUE that holds the backup target period
		ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);

		//Form to be processed SET
		ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();

		/*
		 *Create an instance of a cyclic barrier.
		 *-Specify the number of form types for the number of worker threads to be waited for.
		 *-Specify a barrier action as an argument to execute common processing.
		 */
		CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));

		//Use the Executor to kick the worker thread.
		ExecutorService service = Executors.newCachedThreadPool();

		for(Report report: Report.values()){
			service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
		}
		service.shutdown();
	}
}

■ Worker thread

The worker thread makes a file copy.

Worker thread


package sample;

import static java.lang.System.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;

/**
 *Worker thread doing file copy
 * @author nakam
 *
 */
public class CopyFile implements Runnable {

	/**Form in charge*/
	private Report report;

	/**Target period*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Form file in the WORK folder*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	/**Synchronize with CyclicBarrier*/
	private CyclicBarrier barrier;

	public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
		this.report = report;
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
		this.barrier = barrier;
	}

	@Override
	public void run(){

		FilePath filePath = new FilePath();

		while(!dateQueue.isEmpty()){

			try {
				Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
				Path dst = filePath.getWorkDirPath().resolve(src.getFileName());

				//Copy the form file to the WORK folder
				Files.copy(src, dst);
				out.println(MessageFormat.format("I copied the file. Original:{0}Copy to:{1}",src,dst));
				reportPathSetInWorkDir.add(dst);

				//Wait with a cyclic barrier
				barrier.await();
			} catch (IOException | InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}

■ Barrier action

Barrier actions compress and save files.

Barrier action


package sample;

import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;

/**
 *Perform compression processing as a barrier action.
 * @author nakam
 *
 */
public class BarrierAction implements Runnable {

	/**Target period*/
	private ConcurrentLinkedQueue<LocalDate> dateQueue;

	/**Form file in the WORK folder*/
	private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

	public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
		this.dateQueue = dateQueue;
		this.reportPathSetInWorkDir = reportPathSetInWorkDir;
	}

	@Override
	public void run() {

		if(!dateQueue.isEmpty()){

			//Compress the form file in the WORK folder and save it in the archive folder
			Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
			reportPathSetInWorkDir.clear();
		}
	}
}

■ Other classes

In addition, the following classes are used, but they are omitted because they have nothing to do with CyclicBarrier. If you are interested, please refer to the source code on Github.

name of the class Processing content
Compress File compression process
ConfigKey Hold the key of the configuration file
ConfigUtil Load the config file
ExternalCommand Execute external commands from Java
FilePath Generate a file path
Report Hold the type of form
Util utility

test

Run

Execute with the following arguments. The backup target period is specified as (2017/10/01 --2017/10/03).

Java argument


2017-10-01 2017-10-03

Execution result

Console output


I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171001.csv copy destination: C:\test\work\sales_20171001.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171001.csv copy destination: C:\test\work\coupon_20171001.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171001.csv copy destination: C:\test\work\stock_20171001.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171001.7z

Items to compress: 3


Files read from disk: 3
Archive size: 238 bytes (1 KiB)

Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv "was executed. Exit code:0
I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171002.csv copy destination: C:\test\work\sales_20171002.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171002.csv copy destination: C:\test\work\stock_20171002.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171002.csv copy destination: C:\test\work\coupon_20171002.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171002.7z

Items to compress: 3


Files read from disk: 3
Archive size: 240 bytes (1 KiB)

Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv "was executed. Exit code:0
I copied the file. Copy source: C:\test\dailyReport\sales\sales_20171003.csv copy destination: C:\test\work\sales_20171003.csv
I copied the file. Copy source: C:\test\dailyReport\stock\stock_20171003.csv copy destination: C:\test\work\stock_20171003.csv
I copied the file. Copy source: C:\test\dailyReport\coupon\coupon_20171003.csv copy destination: C:\test\work\coupon_20171003.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171003.7z

Items to compress: 3


Files read from disk: 3
Archive size: 239 bytes (1 KiB)

Everything is Ok
External command "C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv "was executed. Exit code:0

You can see that the barrier action is being performed after the worker thread has copied the file. The process is executed three times, but since only the file copy is executed in parallel, the copy finish order may be different. Other processing is synchronized by CyclicBarrier and is executed sequentially.

Folder after running the test

C:
└─test
   ├─archive
   │      archive_20171001.7z
   │      archive_20171002.7z
   │      archive_20171003.7z
   │
   ├─config
   │      config.properties
   │      
   ├─dailyReport
   │  ├─coupon
   │  │      coupon_20171001.csv
   │  │      coupon_20171002.csv
   │  │      coupon_20171003.csv
   │  │      
   │  ├─sales
   │  │      sales_20171001.csv
   │  │      sales_20171002.csv
   │  │      sales_20171003.csv
   │  │      
   │  └─stock
   │          stock_20171001.csv
   │          stock_20171002.csv
   │          stock_20171003.csv
   │          
   └─work

Consideration

CyclicBarrier is a synchronization support function, so you can implement the same process without using CyclicBarrier. In the level program implemented this time, even if you use Thread.join to synchronize the worker threads, it will work without problems. However, in that case, the management class will be in charge of centralized management, and will generate a worker thread that only copies one file each time. With CyclicBarrier, you can let the same thread do the work without creating a thread every time. CyclicBarrier is in charge of synchronization between threads, so the management class can finish the process after the worker thread is created. Barrier actions are responsible for updating shared resources or performing common processing when threads meet.

■ Barrier action in sample program

role Processing content
Update shared resources Advance the processing target date
Execution of shared processing Compressed save of form file

Impressions

Even if you don't have a management class, it would be interesting to use CyclicBrrier as an intermediary to keep the worker threads and barrier actions in sync.

Sample program storage location

Github

reference

Java SE 8 & JDK 9 Class CyclicBarrier Technical Blog 51st Synchronizer CyclicBarrier

Execution environment

Recommended Posts

Perform parallel processing using Java's CyclicBarrier
Data processing using Apache Flink
Measured parallel processing in Java
[Swift] Asynchronous processing using PromiseKit
[Processing] Try using GT Force.
Csv output processing using super-csv