Control the processing flow of Spring Batch with JavaConfig.

You can control the processing flow of Spring Batch with JavaConfig as well as XML. Use tasklet-based steps as a sample to organize by processing pattern.

Pattern to process in order

2017-11-18_144958.jpg This is the simplest processing pattern, and it is OK if you register the steps in order in jobBuilder.

@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private Task1 task1;
	
	@Autowired
	private Task2 task2;
	
	@Autowired
	private Task3 task3;
	
	@Bean
	public Step step1() {
		return stepBuilderFactory.get("step1").tasklet(task1).build();
	}

	@Bean
	public Step step2() {
		return stepBuilderFactory.get("step2").tasklet(task2).build();
	}
	
	@Bean
	public Step step3() {
		return stepBuilderFactory.get("step3").tasklet(task3).build();
	}

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 ->It is executed in the order of step3.
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1)
				.next(step2)
				.next(step3)
				.build();
	}
}

Alternatively, each step is combined into one processing flow and registered in jobBuilder.

    ......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 ->Create a flow for step3
		Flow flow = new FlowBuilder<Flow>("flow")
				.from(step1)
				.next(step2)
				.next(step3)
				.build();
		
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow)
				.end()
				.build();
	}

Conditional branching pattern

2017-11-18_151605.jpg

You can register the task processing result by setting Exit Status in Step Contribution during the task processing.

@Component
public class Task1 implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		
		if (isCheckOK()) {
			//success
			contribution.setExitStatus(ExitStatus.COMPLETED);
		} else {
			//Failure
			contribution.setExitStatus(ExitStatus.FAILED);
		}
		
		return RepeatStatus.FINISHED;
	}

    ......
}

Get the result of the task and conditional branch with on.

	.....

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> OK -> step2
		//          NG -> step3
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
				.end()
				.build();
	}

If there is no subsequent task depending on the condition, you can use fail to end the process.

	.....

	@Bean
	public Job job(Step step1, Step step2) throws Exception {
		// step1 -> OK -> step2
		//          NG -> end
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
				.end()
				.build();
	}

Pattern to be processed in parallel (split)

2017-11-18_145234.jpg

This is an asynchronous processing pattern. Use the split of flow as follows.

	......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		//Register step1 with flow1
		Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
		//Register step2 and step3 of parallel processing in flow2
		Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
				.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
		
		// flow1 ->Register with jobBuilder in the order of flow2
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow1)
				.next(flow2)
				.end()
				.build();
	}

Pattern to process in parallel (partition)

2017-11-18_182539.jpg

Unlike split, partition cannot describe different processing for each thread. Concurrency becomes one step, and that step is duplicated according to the amount of processing, and it is processed in multiple threads. Things necessary:

    1. master task: slave task box
  1. slave task: Target task to be replicated
    1. handler: Controls how many slave threads are created
  2. partitioner: Set input information for slave processing (example: processing range)
@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;
	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DemoPartitioner demoPartitioner;
	
	@Autowired
	private SlaveTask slaveTask;

	@Bean
	public Step slaveStep() {
		return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
	}

	
	@Bean
	public Step masterStep() {
		//Set slave, handler, partitioner in master
		return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
				.partitionHandler(handler()).build();
	}
	
	@Bean
	public Job job() {
		return jobBuilderFactory.get("job")
			.incrementer(new RunIdIncrementer())
			.start(masterStep())
			.build();
	}

	@Bean
	public PartitionHandler handler() {
		TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
		handler.setGridSize(10);
		handler.setTaskExecutor(taskExecutor());
		handler.setStep(slaveStep());
		try {
			handler.afterPropertiesSet();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return handler;
	}

	@Bean
	public SimpleAsyncTaskExecutor taskExecutor() {
		return new SimpleAsyncTaskExecutor();
	}
}

In the partitioner class, set the input information of each thread process in ExecutionContext. Example below: Thread1 is 1 ~ 10, Thread is 11 ~ 20 ...

@Component
public class DemoPartitioner implements Partitioner {

	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {

		Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

		int range = 10;
		int from = 1;
		int to = range;

		for (int i = 1; i <= gridSize; i++) {
			ExecutionContext context = new ExecutionContext();

			context.putString("name", "thread" + i);
			context.putInt("from", from);
			context.putInt("to", to);

			map.put("partition" + i, context);

			from = to + 1;
			to += range;

		}
		return map;
	}
}

In the slave task, get the input information of the process from ExecutionContext.

@Component
public class SlaveTask implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
		int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
		int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
		
		System.out.println(name + ":" + fromId + "~" + toId);
		return RepeatStatus.FINISHED;
	}
}

The above execution result

thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50

Reference: https://sites.google.com/site/soracane/home/springnitsuite/spring-batch

Recommended Posts

Control the processing flow of Spring Batch with JavaConfig.
I want to understand the flow of Spring processing request parameters
I examined the flow of TCP communication with Spring Integration (client edition)
I examined the flow of TCP communication with Spring Integration (server edition)
Access the built-in h2db of spring boot with jdbcTemplate
Roughly the flow of web application development with Rails.
Let's experience the authorization code grant flow with Spring Security OAuth-Part 1: Review of OAuth 2.0
A story packed with the basics of Spring Boot (solved)
Aiming for a basic understanding of the flow of recursive processing
[Rails] Implementation of coupon function (with automatic deletion function using batch processing)
See the behavior of entity update with Spring Boot + Spring Data JPA
Basic processing flow of java Stream
I want to control the default error message of Spring Boot
Order of processing in the program
Image processing: Let's play with the image
Filter the result of BindingResult [Spring]
How to access Socket directly with the TCP function of Spring Integration
Comparison of Alibaba's open source Sentinel Java flow control project with Hystrix
Processing at application startup with Spring Boot
The story of encountering Spring custom annotation
Create a simple on-demand batch with Spring Batch
[Swift] Vaguely grasp the flow of Delegate
I investigated the internal processing of Retrofit
About the initial display of Spring Framework
Check the processing contents with [rails] binding.pry
Play with the Processing libraries "ControlP5", "Fisica"
Implement the box ball system with Processing
About the treatment of BigDecimal (with reflection)
Asynchronous processing with Spring Boot using @Async
Format the contents of LocalDate with DateTimeFormatter
[Java] [Spring] Test the behavior of the logger
Part 2: Understand (roughly) the process flow of OAuth 2.0 Login supported by Spring Security 5
I want to control the start / stop of servers and databases with Alexa
Organize the differences in behavior of @NotBlank, @NotEmpty, @NotNull with Spring Boot + Thymeleaf
Resource handler settings when delivering SPA with the static resource function of Spring Boot
Part 3: Understand (deeply) the process flow of OAuth 2.0 Login supported by Spring Security 5
Asynchronous processing with regular execution in Spring Boot
Verify the contents of the argument object with Mockito
About the official start guide of Spring Framework
[LeJOS] Let's control the EV3 motor with Java
Use the Mac menu bar with Processing 3 apps
Manage the version of Ruby itself with rbenv
Overwrite the contents of config with Spring-boot + JUnit5
The story of raising Spring Boot 1.5 series to 2.1 series
The story of tuning android apps with libGDX
Let's check the feel of Spring Boot + Swagger 2.0
Periodically update DB with Spring Batch and MyBatis
Calculate the similarity score of strings with JAVA
Collective handling of Spring validation errors with @ControllerAdvice
[Rails] Implementation of batch processing using whenever (gem)
Matches annotations on the interface with Spring AOP
Prepare the environment of CentOS 8 with Sakura VPS
NLP4J [006-032] 100 language processing with NLP4J Knock # 32 Prototype of verb
Specify the default value with @Builder of Lombok
Measure the distance of the maze with breadth-first search
The official name of Spring MVC is Spring Web MVC
Summary of what I learned in Spring Batch
I checked the number of taxis with Ruby
A memo that I was addicted to when making batch processing with Spring Boot
[Spring Boot] The story that the bean of the class with ConfigurationProperties annotation was not found