Sie können den Verarbeitungsablauf von Spring Batch sowohl mit JavaConfig als auch mit XML steuern. Verwenden Sie taskletbasierte Schritte als Beispiel, um sie nach Verarbeitungsmustern zu organisieren.
Dies ist das einfachste Verarbeitungsmuster, und es ist in Ordnung, wenn Sie die Schritte in der Reihenfolge in jobBuilder registrieren.
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Task1 task1;
@Autowired
private Task2 task2;
@Autowired
private Task3 task3;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(task1).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(task2).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(task3).build();
}
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->Es wird in der Reihenfolge von Schritt 3 ausgeführt.
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.next(step3)
.build();
}
}
Alternativ wird jeder Schritt zu einem Verarbeitungsablauf zusammengefasst und in jobBuilder registriert.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->Erstellen Sie einen Flow für Schritt 3
Flow flow = new FlowBuilder<Flow>("flow")
.from(step1)
.next(step2)
.next(step3)
.build();
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow)
.end()
.build();
}
Sie können das Ergebnis der Aufgabenverarbeitung registrieren, indem Sie während der Aufgabenverarbeitung unter Schrittbeitrag den Status Beenden festlegen.
@Component
public class Task1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (isCheckOK()) {
//Erfolg
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
//Fehler
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
......
}
Holen Sie sich das Ergebnis der Aufgabe und des bedingten Zweigs mit on.
.....
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> OK -> step2
// NG -> step3
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
.end()
.build();
}
Wenn es je nach Bedingung keine nachfolgende Aufgabe gibt, können Sie den Vorgang nicht beenden.
.....
@Bean
public Job job(Step step1, Step step2) throws Exception {
// step1 -> OK -> step2
// NG -> end
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
.end()
.build();
}
Dies ist ein asynchrones Verarbeitungsmuster. Verwenden Sie die Aufteilung des Flusses wie folgt.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
//Registrieren Sie Schritt1 bis Flow1
Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
//Registrieren Sie Schritt 2 und Schritt 3 der Parallelverarbeitung in Fluss 2
Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
// flow1 ->Registrieren Sie sich bei jobBuilder in der Reihenfolge flow2
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow1)
.next(flow2)
.end()
.build();
}
Im Gegensatz zu Split kann die Partition nicht für jeden Thread eine andere Verarbeitung beschreiben. Die parallele Verarbeitung wird zu einem Schritt, und dieser Schritt wird entsprechend dem Verarbeitungsaufwand dupliziert und in mehreren Threads verarbeitet. Notwendige Dinge:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DemoPartitioner demoPartitioner;
@Autowired
private SlaveTask slaveTask;
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
}
@Bean
public Step masterStep() {
//Stellen Sie Slave, Handler und Partitionierer im Master ein
return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
.partitionHandler(handler()).build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
@Bean
public PartitionHandler handler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slaveStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
Legen Sie in der Partitioniererklasse die Eingabeinformationen für jeden Thread-Prozess in ExecutionContext fest. Beispiel unten: Thread1 ist 1 ~ 10, Thread ist 11 ~ 20 ...
@Component
public class DemoPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
int range = 10;
int from = 1;
int to = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putString("name", "thread" + i);
context.putInt("from", from);
context.putInt("to", to);
map.put("partition" + i, context);
from = to + 1;
to += range;
}
return map;
}
}
Rufen Sie in der Slave-Task die Eingabeinformationen des Prozesses von ExecutionContext ab.
@Component
public class SlaveTask implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
System.out.println(name + ":" + fromId + "~" + toId);
return RepeatStatus.FINISHED;
}
}
Das obige Ausführungsergebnis
thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50
Referenz: https://sites.google.com/site/soracane/home/springnitsuite/spring-batch
Recommended Posts