Contrôlez le flux de traitement Spring Batch avec JavaConfig.

Vous pouvez contrôler le flux de traitement de Spring Batch avec JavaConfig ainsi que XML. Utilisez les étapes basées sur les tâches comme exemple pour organiser par modèle de traitement.

Modèle à traiter dans l'ordre

2017-11-18_144958.jpg Il s'agit du modèle de traitement le plus simple, et c'est OK si vous enregistrez les étapes dans l'ordre dans jobBuilder.

@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private Task1 task1;
	
	@Autowired
	private Task2 task2;
	
	@Autowired
	private Task3 task3;
	
	@Bean
	public Step step1() {
		return stepBuilderFactory.get("step1").tasklet(task1).build();
	}

	@Bean
	public Step step2() {
		return stepBuilderFactory.get("step2").tasklet(task2).build();
	}
	
	@Bean
	public Step step3() {
		return stepBuilderFactory.get("step3").tasklet(task3).build();
	}

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 ->Il est exécuté dans l'ordre de l'étape 3.
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1)
				.next(step2)
				.next(step3)
				.build();
	}
}

Alternativement, chaque étape est combinée dans un flux de traitement et enregistrée dans jobBuilder.

    ......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 ->Créer un flux pour l'étape 3
		Flow flow = new FlowBuilder<Flow>("flow")
				.from(step1)
				.next(step2)
				.next(step3)
				.build();
		
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow)
				.end()
				.build();
	}

Modèle de branchement conditionnel

2017-11-18_151605.jpg

Vous pouvez enregistrer le résultat du traitement de la tâche en définissant le statut de sortie dans la contribution d'étape pendant le traitement de la tâche.

@Component
public class Task1 implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		
		if (isCheckOK()) {
			//Succès
			contribution.setExitStatus(ExitStatus.COMPLETED);
		} else {
			//Échec
			contribution.setExitStatus(ExitStatus.FAILED);
		}
		
		return RepeatStatus.FINISHED;
	}

    ......
}

Obtenez le résultat de la tâche et de la branche conditionnelle avec on.

	.....

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> OK -> step2
		//          NG -> step3
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
				.end()
				.build();
	}

S'il n'y a pas de tâche ultérieure en fonction de la condition, vous pouvez utiliser échouer pour terminer le processus.

	.....

	@Bean
	public Job job(Step step1, Step step2) throws Exception {
		// step1 -> OK -> step2
		//          NG -> end
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
				.end()
				.build();
	}

Motif à traiter en parallèle (fractionné)

2017-11-18_145234.jpg

Il s'agit d'un modèle de traitement asynchrone. Utilisez la répartition du flux comme suit.

	......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		//Enregistrez l'étape 1 dans le flux 1
		Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
		//Enregistrer les étapes 2 et 3 du traitement parallèle dans le flux 2
		Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
				.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
		
		// flow1 ->Inscrivez-vous à jobBuilder dans l'ordre du flux2
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow1)
				.next(flow2)
				.end()
				.build();
	}

Pattern à traiter en parallèle (partition)

2017-11-18_182539.jpg

Contrairement à la division, la partition ne peut pas décrire un traitement différent pour chaque thread. Le traitement parallèle devient une étape, et cette étape est dupliquée en fonction de la quantité de traitement, et elle est traitée dans plusieurs threads. Choses nécessaires:

    1. tâche maître: boîte de tâches esclave
  1. tâche esclave: tâche à dupliquer
    1. handler: contrôle le nombre de threads esclaves créés
  2. partitionneur: définir les informations d'entrée pour le traitement esclave (exemple: plage de traitement)
@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;
	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DemoPartitioner demoPartitioner;
	
	@Autowired
	private SlaveTask slaveTask;

	@Bean
	public Step slaveStep() {
		return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
	}

	
	@Bean
	public Step masterStep() {
		//Définir l'esclave, le gestionnaire, le partitionneur dans le maître
		return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
				.partitionHandler(handler()).build();
	}
	
	@Bean
	public Job job() {
		return jobBuilderFactory.get("job")
			.incrementer(new RunIdIncrementer())
			.start(masterStep())
			.build();
	}

	@Bean
	public PartitionHandler handler() {
		TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
		handler.setGridSize(10);
		handler.setTaskExecutor(taskExecutor());
		handler.setStep(slaveStep());
		try {
			handler.afterPropertiesSet();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return handler;
	}

	@Bean
	public SimpleAsyncTaskExecutor taskExecutor() {
		return new SimpleAsyncTaskExecutor();
	}
}

Dans la classe de partitionnement, définissez les informations d'entrée pour chaque processus de thread dans ExecutionContext. Exemple ci-dessous: Thread1 est 1 ~ 10, Thread est 11 ~ 20 ...

@Component
public class DemoPartitioner implements Partitioner {

	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {

		Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

		int range = 10;
		int from = 1;
		int to = range;

		for (int i = 1; i <= gridSize; i++) {
			ExecutionContext context = new ExecutionContext();

			context.putString("name", "thread" + i);
			context.putInt("from", from);
			context.putInt("to", to);

			map.put("partition" + i, context);

			from = to + 1;
			to += range;

		}
		return map;
	}
}

Dans la tâche esclave, récupérez les informations d'entrée du processus à partir d'ExecutionContext.

@Component
public class SlaveTask implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
		int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
		int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
		
		System.out.println(name + ":" + fromId + "~" + toId);
		return RepeatStatus.FINISHED;
	}
}

Le résultat d'exécution ci-dessus

thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50

Référence: https://sites.google.com/site/soracane/home/springnitsuite/spring-batch

Recommended Posts

Contrôlez le flux de traitement Spring Batch avec JavaConfig.
Je veux comprendre le flux des paramètres de demande de traitement Spring
J'ai examiné le flux de communication TCP avec Spring Integration (édition client)
J'ai examiné le flux de communication TCP avec Spring Integration (édition serveur)
Accédez au h2db intégré de Spring Boot avec jdbcTemplate
À peu près le flux de développement d'applications Web avec Rails.
Expérimentons le flux d'octroi de code d'autorisation avec Spring Security OAuth-Part 1: Review of OAuth 2.0
Une histoire remplie des bases de Spring Boot (résolu)
Viser une compréhension de base du flux de traitement récursif
[Rails] Implémentation de la fonction coupon (avec fonction de suppression automatique par traitement par lots)
Voir le comportement des mises à jour d'entités avec Spring Boot + Spring Data JPA
Flux de traitement de base de Java Stream
Je veux contrôler le message d'erreur par défaut de Spring Boot
Ordre de traitement dans le programme
Traitement d'image: jouons avec l'image
Filtrer le résultat de BindingResult [Spring]
Comment accéder directement à Socket avec la fonction TCP de Spring Integration
Comparaison du projet Open Source Sentinel Java Flow Control d'Alibaba et Hystrix
Traitement lors du démarrage d'une application avec Spring Boot
L'histoire de la rencontre avec l'annotation personnalisée Spring
Créez un lot à la demande simple avec Spring Batch
J'ai étudié le traitement interne de Retrofit
À propos de l'affichage initial de Spring Framework
Vérifiez le contenu du traitement avec [rails] binding.pry
Implémentez le système de box ball avec Processing
À propos du traitement de BigDecimal (avec réflexion)
Traitement asynchrone avec Spring Boot en utilisant @Async
Mettre en forme le contenu de LocalDate avec DateTimeFormatter
[Java] [Spring] Tester le comportement de l'enregistreur
Partie 2: Comprendre (approximativement) le flux de processus de la connexion OAuth 2.0 prise en charge par Spring Security 5
Organisez les différences de comportement de @NotBlank, @NotEmpty et @NotNull avec Spring Boot + Thymeleaf
Paramètres du gestionnaire de ressources lors de la livraison du SPA avec la fonction de ressource statique de Spring Boot
Partie 3: Comprendre (en profondeur) le flux de processus de la connexion OAuth 2.0 prise en charge par Spring Security 5
Traitement asynchrone avec exécution régulière dans Spring Boot
Vérifiez le contenu de l'objet argument avec Mockito
À propos du guide de démarrage officiel de Spring Framework
[LeJOS] Contrôlons le moteur EV3 avec Java
Utilisez la barre de menus Mac avec les applications Processing 3
Gérez la version de Ruby elle-même avec rbenv
Écraser le contenu de la configuration avec Spring-boot + JUnit5
L'histoire de la montée de la série Spring Boot 1.5 à la série 2.1
L'histoire du réglage de l'application Android avec libGDX
Vérifions la sensation de Spring Boot + Swagger 2.0
Mettre à jour périodiquement la base de données avec Spring Batch et My Batis
Calculer le score de similarité des chaînes de caractères avec JAVA
Gestion collective des erreurs de validation Spring avec @ControllerAdvice
Correspond aux annotations sur l'interface avec Spring AOP
Préparez l'environnement CentOS 8 avec Sakura VPS
NLP4J [006-032] 100 traitements linguistiques avec NLP4J Knock # 32 Prototype de verbe
Spécifiez la valeur par défaut avec @Builder of Lombok
Mesurez la distance du labyrinthe avec la recherche de priorité de largeur
Le nom officiel de Spring MVC est Spring Web MVC
Résumé de ce que j'ai appris dans Spring Batch
J'ai vérifié le nombre de taxis avec Ruby
Notez que j'étais accro au traitement par lots avec Spring Boot
[Spring Boot] L'histoire selon laquelle le bean de la classe avec l'annotation ConfigurationProperties n'a pas été trouvé