Vous pouvez contrôler le flux de traitement de Spring Batch avec JavaConfig ainsi que XML. Utilisez les étapes basées sur les tâches comme exemple pour organiser par modèle de traitement.
Il s'agit du modèle de traitement le plus simple, et c'est OK si vous enregistrez les étapes dans l'ordre dans jobBuilder.
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Task1 task1;
@Autowired
private Task2 task2;
@Autowired
private Task3 task3;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(task1).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(task2).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(task3).build();
}
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->Il est exécuté dans l'ordre de l'étape 3.
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.next(step3)
.build();
}
}
Alternativement, chaque étape est combinée dans un flux de traitement et enregistrée dans jobBuilder.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->Créer un flux pour l'étape 3
Flow flow = new FlowBuilder<Flow>("flow")
.from(step1)
.next(step2)
.next(step3)
.build();
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow)
.end()
.build();
}
Vous pouvez enregistrer le résultat du traitement de la tâche en définissant le statut de sortie dans la contribution d'étape pendant le traitement de la tâche.
@Component
public class Task1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (isCheckOK()) {
//Succès
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
//Échec
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
......
}
Obtenez le résultat de la tâche et de la branche conditionnelle avec on.
.....
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> OK -> step2
// NG -> step3
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
.end()
.build();
}
S'il n'y a pas de tâche ultérieure en fonction de la condition, vous pouvez utiliser échouer pour terminer le processus.
.....
@Bean
public Job job(Step step1, Step step2) throws Exception {
// step1 -> OK -> step2
// NG -> end
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
.end()
.build();
}
Il s'agit d'un modèle de traitement asynchrone. Utilisez la répartition du flux comme suit.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
//Enregistrez l'étape 1 dans le flux 1
Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
//Enregistrer les étapes 2 et 3 du traitement parallèle dans le flux 2
Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
// flow1 ->Inscrivez-vous à jobBuilder dans l'ordre du flux2
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow1)
.next(flow2)
.end()
.build();
}
Contrairement à la division, la partition ne peut pas décrire un traitement différent pour chaque thread. Le traitement parallèle devient une étape, et cette étape est dupliquée en fonction de la quantité de traitement, et elle est traitée dans plusieurs threads. Choses nécessaires:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DemoPartitioner demoPartitioner;
@Autowired
private SlaveTask slaveTask;
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
}
@Bean
public Step masterStep() {
//Définir l'esclave, le gestionnaire, le partitionneur dans le maître
return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
.partitionHandler(handler()).build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
@Bean
public PartitionHandler handler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slaveStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
Dans la classe de partitionnement, définissez les informations d'entrée pour chaque processus de thread dans ExecutionContext. Exemple ci-dessous: Thread1 est 1 ~ 10, Thread est 11 ~ 20 ...
@Component
public class DemoPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
int range = 10;
int from = 1;
int to = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putString("name", "thread" + i);
context.putInt("from", from);
context.putInt("to", to);
map.put("partition" + i, context);
from = to + 1;
to += range;
}
return map;
}
}
Dans la tâche esclave, récupérez les informations d'entrée du processus à partir d'ExecutionContext.
@Component
public class SlaveTask implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
System.out.println(name + ":" + fromId + "~" + toId);
return RepeatStatus.FINISHED;
}
}
Le résultat d'exécution ci-dessus
thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50
Référence: https://sites.google.com/site/soracane/home/springnitsuite/spring-batch
Recommended Posts