[JAVA] Exemple de traitement par lots de données sur DB avec des démarreurs Apache Camel Spring Boot

Dans la suite de "[Procédure de création d'une application chameau simple à l'aide des démarreurs Apache Camel Spring Boot](/ HMMNRST / items / db9cb387ecfde0944496)", implémentez ** Batch pour traiter les tâches stockées dans MySQL **.

En fait, le but est de ** expérimenter le contrôle exclusif ** lors du traitement en parallèle sur plusieurs machines, mais je ne peux pas encore le faire car je comprends à moitié SQL, camel et Java. Cette application se positionne comme un exemple qui semble conduire à cela.


Je n'expliquerai pas la dernière partie, mais j'écrirai tous les fichiers nécessaires dans cet article afin que je puisse les déplacer sans regarder en arrière.

Liste des fichiers


path/to/app/
  ├── src/main/
  │     ├── java/org/example/mycamelapp/
  │     │     ├── db/
  │     │     │     ├── TaskSql.java
  │     │     │     └── TaskTable.java
  │     │     ├── processors/
  │     │     │     └── PseudoWorkProcessor.java
  │     │     ├── routes/
  │     │     │     └── TaskExecutionRoute.java
  │     │     └── MyCamelApplication.java
  │     └── resources/
  │           └── application.yml
  └── pom.xml

environnement

Identique à la dernière fois sauf que DB a été ajouté.

Définition de la table DB

Créez une table de tâches. Les deux colonnes suivantes sont basées sur l'image du "traitement des tâches".

L'hypothèse de contrôle exclusif que je souhaite expérimenter est que différentes tâches peuvent être traitées en parallèle sans se soucier de l'ordre, mais la même tâche ne doit pas être exécutée deux fois.

CREATE DATABASE IF NOT EXISTS my_camel_app;

CREATE TABLE IF NOT EXISTS my_camel_app.task (
	task_id    int(11)      NOT NULL AUTO_INCREMENT,
	status     int(11)      NOT NULL DEFAULT 0,
	executor   varchar(255)          DEFAULT NULL,
	created_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
	updated_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (task_id)
);

Vous pouvez ajouter des enregistrements comme ʻINSERT INTO my_camel_app.task () VALUES (); `. Il peut être possible de l'augmenter automatiquement en utilisant la minuterie précédente.

pom.xml

<détails>

Texte intégral (cliquez pour développer) </ summary>

pom.xml&#x20;(mycamelapp)


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>mycamelapp</artifactId>
	<version>1.0-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
	</parent>

	<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
	<dependencyManagement>
		<dependencies>
			<!-- Camel BOM -->
			<dependency>
				<groupId>org.apache.camel.springboot</groupId>
				<artifactId>camel-spring-boot-dependencies</artifactId>
				<version>3.2.0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<!-- ... other BOMs or dependencies ... -->
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<!-- Camel Starter -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-sql-starter</artifactId>
		</dependency>
		<!-- ... other dependencies ... -->

		<!-- JDBC -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>

		<!-- Utils -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

point de changement:

  • Spécifiez spring-boot-starter-parent pour le projet parent
  • Si vous ajoutez des dépendances spring-boot à dependencyManagement, les plugins ne sont pas hérités et doivent être explicitement indiqués.
  • 2.2.6.RELEASE est pour la version chameau
  • Ajouter aux dépendances
    • camel-sql-starter
    • spring-boot-starter-jdbc
    • mysql-connector-java
  • lombok (pour la génération automatique des getters, utilisé au moment de la compilation)
  • Autre
  • Supprimez toute la construction (car les plugins utilisent le paramètre spring-boot-dependencies)

Il est pratique que vous n'ayez pas besoin d'écrire une nouvelle version de ce qui est écrit dans dependencyManagement (y compris le projet parent). Il y avait aussi mysql-connector-java et lombok.

Implémentation du traitement des tâches

Créons une route qui "acquiert plusieurs tâches de la base de données à la fois, les traite en parallèle et met à jour la base de données".

camel-sample-route.png

Le statut de la tâche à traiter est «NON EXÉCUTÉ» → «RÉUSSI». (Attribuez les numéros appropriés à chaque statut plus tard)

Route Je définirai les paramètres détaillés et le traitement séparément, et je créerai d'abord uniquement la structure comme indiqué dans l'image. (Si vous copiez le code, vous devez importer d'autres classes, donc la dernière est bonne)

src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java


@Component
public class TaskExecutionRoute extends RouteBuilder {

	// dummy hostname
	static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());

	@Override
	public void configure() throws Exception {
		from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
				// exchange.getIn().getBody() is List<Map<String, Object>> of records
				.log("${body.size} rows selected.")
				.split(body()).parallelProcessing()
					// exchange.getIn().getBody() is Map<String, Object> of a record
					.process(new PseudoWorkProcessor())
					.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
				.end()
				.log("all tasks finished.")
		;
	}
}
  • Ecrivez le paramètre pour recevoir les données dans from () comme une chaîne URI.
  • camel peut souvent écrire une extension d'expression sous la forme $ {expr} dans une chaîne.
  • Divisez les données avec split () et traitez chacune.
  • Les données divisées seront emballées dans un nouvel échange.
  • Cette fois, je me fiche de l'ordre, alors spécifiez parallelProcessing () pour traiter en parallèle.
  • Vous pouvez opérer l'échange de différentes manières avec la chaîne de méthodes, mais cette fois, éditez l'échange avec votre propre classe avec process ().
  • Ecrivez le paramètre pour transmettre les données à to () sous forme de chaîne URI.
  • ʻEnd () `indique la fin de la structure imbriquée.

Avec Java DSL, même si vous oubliez d'entrer ʻend () `, vous obtiendrez rarement une erreur de syntaxe, et il y a un risque que la structure de la route soit différente de ce que vous avez imaginé.

Comme les noms RouteBuilder et configure () l'indiquent, cette méthode n'est pas appelée à chaque fois que des données arrivent, mais une seule fois lorsque l'application démarre. Par conséquent, même si un point d'arrêt est défini, le traitement des données ne peut pas être débogué.

Tables et requêtes

src/main/java/org/example/mycamelapp/db/TaskTable.java


public interface TaskTable {

	String TABLE_NAME = "task";

	String TASK_ID = "task_id";
	String STATUS = "status";
	String EXECUTOR = "executor";
	String CREATED_AT = "created_at";
	String UPDATED_AT = "updated_at";

	@AllArgsConstructor
	@Getter
	enum Status {

		UNEXECUTED(0),
		SUCCEEDED(10),
		FAILED(-1),
		;

		private final int code;
	}
}

src/main/java/org/example/mycamelapp/db/TaskSql.java


public class TaskSql implements TaskTable {

	public static String insertTask() {
		return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
	}

	public static String selectTasks(Status status, int limit) {
		return "sql:SELECT * FROM " + TABLE_NAME
				+ " WHERE " + STATUS + " = " + status.getCode()
				+ " LIMIT " + limit
				+ "?useIterator=false" // List<Map<String, Object>>
				;
	}

	public static String updateTask(Status nextStatus, String hostname) {
		return "sql:UPDATE " + TABLE_NAME
				+ " SET " + STATUS + " = " + nextStatus.getCode()
				+ ", " + EXECUTOR + " = " + quote(hostname)
				+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
				;
	}

	private static String quote(String value) {
		if (value == null) return "NULL";
		return "'" + value + "'";
	}

	private static String ref(String key) {
		return ":#" + key;
	}
}

La chaîne de caractères spécifiée pour «from ()» et «to ()» est un URI, et les options peuvent être spécifiées après «?». Cette fois, ʻuseIterator = falseest utilisé, et au lieu d'envoyer les données acquises de la base de données à la route ligne par ligne avecMap <>, elles sont envoyées avec List <Map <>>`.

La chaîne : # task_id (←ref (TASK_ID)) apparaît dans la requête. Cela permet au chameau de récupérer la valeur correspondant à la clé du corps ou de l'en-tête dans l'échange et de l'intégrer.

Process Mettre au hasard un sommeil pendant 1 à 3 secondes en supposant que "cela prend beaucoup de temps".

src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java


@Slf4j
public class PseudoWorkProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		Map<String, Object> task = exchange.getIn().getBody(Map.class);
		int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
		String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";

		log.info("start  working :: " + infoMsg);
		Thread.sleep(processingTime);
		log.info("finish working :: " + infoMsg);
	}
}

Ceci est appelé chaque fois que les données arrivent, vous pouvez donc définir des points d'arrêt et déboguer.

Commencez

Réglage

Ajoutez des paramètres de connexion à MySQL. Bien qu'il soit écrit ici en yaml, il peut être donné comme une variable d'environnement ou l'option -D (La priorité est définie par Spring Boot. 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). C'était difficile à écrire pendant longtemps, j'ai donc également mis le nom d'utilisateur et le mot de passe dans l'URL.

src/main/resources/application.yml


# to keep the JVM running
camel:
  springboot:
    main-run-controller: true

spring:
  data-source:
    url: jdbc:mysql://user:password@localhost:3306/my_camel_app

Le point de départ de l'application est le même que la dernière fois.

src/main/java/org/example/mycamelapp/MyCamelApplication.java


@SpringBootApplication
public class MyCamelApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyCamelApplication.class, args);
	}
}

Courir

terminal


cd path/to/app

mvn spring-boot:run

référence

Recommended Posts

Exemple de traitement par lots de données sur DB avec des démarreurs Apache Camel Spring Boot
Étapes pour créer une application chameau simple avec les démarreurs Apache Camel Spring Boot
Hello World (API REST) avec Apache Camel + Spring Boot 2
Hello World (application console) avec Apache Camel + Spring Boot 2
Flux jusqu'à la sortie des données de la table à afficher avec Spring Boot
Comment effectuer UT avec Excel en tant que données de test avec Spring Boot + JUnit5 + DBUnit
Exemple de code pour le contrôle de la base de données par transaction déclarative avec Spring Boot + Spring Data JPA
Jusqu'à l'acquisition de données avec Spring Boot + MyBatis + PostgreSQL
Comment utiliser MyBatis2 (iBatis) avec Spring Boot 1.4 (Spring 4)
Comment utiliser h2db intégré avec Spring Boot
Essayez d'implémenter la fonction de connexion avec Spring Boot
Mettre à jour périodiquement la base de données avec Spring Batch et My Batis
Introduction à Spring Boot + In-Memory Data Grid
Essayez d'automatiser la migration avec Spring Boot Flyway
Je voulais classer la botte à ressort dans un multi-projet
[Introduction à Spring Boot] Fonction d'authentification avec Spring Security
[Spring Batch] Données de la table de sortie dans un fichier CSV
Notez que j'étais accro au traitement par lots avec Spring Boot
Paramètres de connexion à MySQL avec Spring Boot + Spring JDBC
Mappez automatiquement DTO aux entités avec l'API Spring Boot
J'ai essayé de démarrer avec Spring Data JPA
Comment utiliser CommandLineRunner dans Spring Batch of Spring Boot
Déployer le projet Spring Boot sur Tomcat dans XAMPP
Comment démarrer par environnement avec Spring Boot de Maven
Tentative de SSR Vue.js avec Spring Boot et GraalJS
Télécharger avec Spring Boot
J'ai essayé le guide d'introduction de Spring Boot [Accès aux données avec JPA]
Découpez SQL en fichier de propriété avec jdbcTemplate of spring boot
Connectez-vous à la base de données avec spring boot + spring jpa et effectuez l'opération CRUD
Application Spring Boot qui spécifie les paramètres de connexion à la base de données avec des paramètres
J'ai essayé de démarrer avec Swagger en utilisant Spring Boot
8 choses à insérer dans DB en utilisant Spring Boot et JPA
[Java] Exemple de projet de développement d'applications Web avec Spring Boot