Dans la suite de "[Procédure de création d'une application chameau simple à l'aide des démarreurs Apache Camel Spring Boot](/ HMMNRST / items / db9cb387ecfde0944496)", implémentez ** Batch pour traiter les tâches stockées dans MySQL **.
En fait, le but est de ** expérimenter le contrôle exclusif ** lors du traitement en parallèle sur plusieurs machines, mais je ne peux pas encore le faire car je comprends à moitié SQL, camel et Java. Cette application se positionne comme un exemple qui semble conduire à cela.
Je n'expliquerai pas la dernière partie, mais j'écrirai tous les fichiers nécessaires dans cet article afin que je puisse les déplacer sans regarder en arrière.
Liste des fichiers
path/to/app/
├── src/main/
│ ├── java/org/example/mycamelapp/
│ │ ├── db/
│ │ │ ├── TaskSql.java
│ │ │ └── TaskTable.java
│ │ ├── processors/
│ │ │ └── PseudoWorkProcessor.java
│ │ ├── routes/
│ │ │ └── TaskExecutionRoute.java
│ │ └── MyCamelApplication.java
│ └── resources/
│ └── application.yml
└── pom.xml
Identique à la dernière fois sauf que DB a été ajouté.
Créez une table de tâches. Les deux colonnes suivantes sont basées sur l'image du "traitement des tâches".
status
: indique l'étape de traitement de la tâche. Pour chaque lot (une seule cette fois), extrayez les tâches à l'étape dont vous êtes en charge et avancez le statut une fois terminé.L'hypothèse de contrôle exclusif que je souhaite expérimenter est que différentes tâches peuvent être traitées en parallèle sans se soucier de l'ordre, mais la même tâche ne doit pas être exécutée deux fois.
CREATE DATABASE IF NOT EXISTS my_camel_app;
CREATE TABLE IF NOT EXISTS my_camel_app.task (
task_id int(11) NOT NULL AUTO_INCREMENT,
status int(11) NOT NULL DEFAULT 0,
executor varchar(255) DEFAULT NULL,
created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (task_id)
);
Vous pouvez ajouter des enregistrements comme ʻINSERT INTO my_camel_app.task () VALUES (); `. Il peut être possible de l'augmenter automatiquement en utilisant la minuterie précédente.
pom.xml
<détails> point de changement: Il est pratique que vous n'ayez pas besoin d'écrire une nouvelle version de ce qui est écrit dans dependencyManagement (y compris le projet parent). Il y avait aussi mysql-connector-java et lombok. Créons une route qui "acquiert plusieurs tâches de la base de données à la fois, les traite en parallèle et met à jour la base de données". Le statut de la tâche à traiter est «NON EXÉCUTÉ» → «RÉUSSI». (Attribuez les numéros appropriés à chaque statut plus tard) Route
Je définirai les paramètres détaillés et le traitement séparément, et je créerai d'abord uniquement la structure comme indiqué dans l'image. (Si vous copiez le code, vous devez importer d'autres classes, donc la dernière est bonne) Avec Java DSL, même si vous oubliez d'entrer ʻend () `, vous obtiendrez rarement une erreur de syntaxe, et il y a un risque que la structure de la route soit différente de ce que vous avez imaginé. Comme les noms La chaîne de caractères spécifiée pour «from ()» et «to ()» est un URI, et les options peuvent être spécifiées après «?». Cette fois, ʻuseIterator = false La chaîne Process
Mettre au hasard un sommeil pendant 1 à 3 secondes en supposant que "cela prend beaucoup de temps". Ceci est appelé chaque fois que les données arrivent, vous pouvez donc définir des points d'arrêt et déboguer. Ajoutez des paramètres de connexion à MySQL. Bien qu'il soit écrit ici en yaml, il peut être donné comme une variable d'environnement ou l'option Le point de départ de l'application est le même que la dernière fois.
Recommended Posts
pom.xml (mycamelapp)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mycamelapp</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
<dependencyManagement>
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- ... other BOMs or dependencies ... -->
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Camel Starter -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
</dependency>
<!-- ... other dependencies ... -->
<!-- JDBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Utils -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
2.2.6.RELEASE
est pour la version chameau
Implémentation du traitement des tâches
src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java
@Component
public class TaskExecutionRoute extends RouteBuilder {
// dummy hostname
static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());
@Override
public void configure() throws Exception {
from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
// exchange.getIn().getBody() is List<Map<String, Object>> of records
.log("${body.size} rows selected.")
.split(body()).parallelProcessing()
// exchange.getIn().getBody() is Map<String, Object> of a record
.process(new PseudoWorkProcessor())
.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
.end()
.log("all tasks finished.")
;
}
}
from ()
comme une chaîne URI.$ {expr}
dans une chaîne.split ()
et traitez chacune.parallelProcessing ()
pour traiter en parallèle.process ()
.to ()
sous forme de chaîne URI.RouteBuilder
et configure ()
l'indiquent, cette méthode n'est pas appelée à chaque fois que des données arrivent, mais une seule fois lorsque l'application démarre. Par conséquent, même si un point d'arrêt est défini, le traitement des données ne peut pas être débogué.Tables et requêtes
src/main/java/org/example/mycamelapp/db/TaskTable.java
public interface TaskTable {
String TABLE_NAME = "task";
String TASK_ID = "task_id";
String STATUS = "status";
String EXECUTOR = "executor";
String CREATED_AT = "created_at";
String UPDATED_AT = "updated_at";
@AllArgsConstructor
@Getter
enum Status {
UNEXECUTED(0),
SUCCEEDED(10),
FAILED(-1),
;
private final int code;
}
}
src/main/java/org/example/mycamelapp/db/TaskSql.java
public class TaskSql implements TaskTable {
public static String insertTask() {
return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
}
public static String selectTasks(Status status, int limit) {
return "sql:SELECT * FROM " + TABLE_NAME
+ " WHERE " + STATUS + " = " + status.getCode()
+ " LIMIT " + limit
+ "?useIterator=false" // List<Map<String, Object>>
;
}
public static String updateTask(Status nextStatus, String hostname) {
return "sql:UPDATE " + TABLE_NAME
+ " SET " + STATUS + " = " + nextStatus.getCode()
+ ", " + EXECUTOR + " = " + quote(hostname)
+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
;
}
private static String quote(String value) {
if (value == null) return "NULL";
return "'" + value + "'";
}
private static String ref(String key) {
return ":#" + key;
}
}
est utilisé, et au lieu d'envoyer les données acquises de la base de données à la route ligne par ligne avec
Map <>, elles sont envoyées avec
List <Map <>>`.: # task_id
(←ref (TASK_ID)
) apparaît dans la requête. Cela permet au chameau de récupérer la valeur correspondant à la clé du corps ou de l'en-tête dans l'échange et de l'intégrer.src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java
@Slf4j
public class PseudoWorkProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> task = exchange.getIn().getBody(Map.class);
int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";
log.info("start working :: " + infoMsg);
Thread.sleep(processingTime);
log.info("finish working :: " + infoMsg);
}
}
Commencez
Réglage
-D
(La priorité est définie par Spring Boot. 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). C'était difficile à écrire pendant longtemps, j'ai donc également mis le nom d'utilisateur et le mot de passe dans l'URL.src/main/resources/application.yml
# to keep the JVM running
camel:
springboot:
main-run-controller: true
spring:
data-source:
url: jdbc:mysql://user:password@localhost:3306/my_camel_app
src/main/java/org/example/mycamelapp/MyCamelApplication.java
@SpringBootApplication
public class MyCamelApplication {
public static void main(String[] args) {
SpringApplication.run(MyCamelApplication.class, args);
}
}
Courir
terminal
cd path/to/app
mvn spring-boot:run
référence