Implementieren Sie in der Fortsetzung von "[Verfahren zum Erstellen einer einfachen Kamel-App mit Apache Camel Spring Boot-Startern](/ HMMNRST / items / db9cb387ecfde0944496) ** Batch, um in MySQL gespeicherte Aufgaben zu verarbeiten **.
Eigentlich besteht der Zweck darin, bei der parallelen Verarbeitung auf mehreren Computern mit exklusiver Kontrolle zu experimentieren, aber das kann ich noch nicht, da ich SQL, Camel und Java auf halbem Weg verstehe. Diese App ist als Beispiel positioniert, das dazu zu führen scheint.
Ich werde den letzten Teil nicht erklären, aber ich werde alle notwendigen Dateien in diesem Artikel schreiben, damit ich sie verschieben kann, ohne zurückzublicken.
Dateiliste
path/to/app/
├── src/main/
│ ├── java/org/example/mycamelapp/
│ │ ├── db/
│ │ │ ├── TaskSql.java
│ │ │ └── TaskTable.java
│ │ ├── processors/
│ │ │ └── PseudoWorkProcessor.java
│ │ ├── routes/
│ │ │ └── TaskExecutionRoute.java
│ │ └── MyCamelApplication.java
│ └── resources/
│ └── application.yml
└── pom.xml
Wie beim letzten Mal, außer dass die Datenbank hinzugefügt wurde.
Erstellen Sie eine Task-Tabelle. Die folgenden zwei Spalten basieren auf dem Bild der "Aufgabenverarbeitung".
status
: Zeigt die Verarbeitungsstufe der Aufgabe an. Extrahieren Sie für jede Charge (diesmal nur eine) die Aufgaben in der Phase, für die Sie verantwortlich sind, und erweitern Sie den Status, wenn Sie fertig sind.executor
: Zeichnet den Host auf, der die Aufgabe verarbeitet / ausgeführt hat. Es kann zur exklusiven Steuerung verwendet werden, wenn eine App parallel auf mehreren Computern ausgeführt wird.Die Annahme der exklusiven Kontrolle, mit der ich experimentieren möchte, ist, dass verschiedene Aufgaben parallel verarbeitet werden können, ohne sich um die Reihenfolge zu kümmern, aber dieselbe Aufgabe sollte nicht zweimal ausgeführt werden.
CREATE DATABASE IF NOT EXISTS my_camel_app;
CREATE TABLE IF NOT EXISTS my_camel_app.task (
task_id int(11) NOT NULL AUTO_INCREMENT,
status int(11) NOT NULL DEFAULT 0,
executor varchar(255) DEFAULT NULL,
created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (task_id)
);
Sie können Datensätze wie "INSERT INTO my_camel_app.task () VALUES ();" hinzufügen. Möglicherweise kann es mit dem vorherigen Timer automatisch erhöht werden.
pom.xml
pom.xml (mycamelapp)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mycamelapp</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
<dependencyManagement>
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- ... other BOMs or dependencies ... -->
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Camel Starter -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
</dependency>
<!-- ... other dependencies ... -->
<!-- JDBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Utils -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Wechselpunkt:
2.2.6.RELEASE
ist für die KamelversionEs ist praktisch, dass Sie keine neue Version von dem schreiben müssen, was in dependencyManagement (einschließlich des übergeordneten Projekts) geschrieben ist. Es gab auch MySQL-Connector-Java und Lombok.
Erstellen wir eine Route, die "mehrere Aufgaben gleichzeitig von der Datenbank erfasst, parallel verarbeitet und die Datenbank aktualisiert".
Der Status der zu verarbeitenden Aufgabe lautet "UNEXECUTED" → "SUCCEEDED". (Weisen Sie später jedem Status die entsprechenden Nummern zu.)
Route Ich werde die detaillierten Einstellungen und die Verarbeitung separat definieren und zunächst nur die im Bild gezeigte Struktur erstellen. (Wenn Sie den Code kopieren, müssen Sie andere Klassen importieren, damit die letzte gut ist.)
src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java
@Component
public class TaskExecutionRoute extends RouteBuilder {
// dummy hostname
static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());
@Override
public void configure() throws Exception {
from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
// exchange.getIn().getBody() is List<Map<String, Object>> of records
.log("${body.size} rows selected.")
.split(body()).parallelProcessing()
// exchange.getIn().getBody() is Map<String, Object> of a record
.process(new PseudoWorkProcessor())
.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
.end()
.log("all tasks finished.")
;
}
}
from ()
als URI-Zeichenfolge.process ()
.to ()
als URI-Zeichenfolge.end ()
gibt das Ende der verschachtelten Struktur an.Selbst wenn Sie bei Java DSL vergessen, "end ()" einzugeben, wird selten ein Syntaxfehler angezeigt, und es besteht das Risiko, dass sich die Routenstruktur von Ihrer Vorstellung unterscheidet.
Wie die Namen "RouteBuilder" und "configure ()" anzeigen, wird diese Methode nicht jedes Mal aufgerufen, wenn Daten eingehen, sondern nur einmal, wenn die App gestartet wird. Selbst wenn ein Haltepunkt festgelegt ist, kann die Datenverarbeitung daher nicht debuggt werden.
src/main/java/org/example/mycamelapp/db/TaskTable.java
public interface TaskTable {
String TABLE_NAME = "task";
String TASK_ID = "task_id";
String STATUS = "status";
String EXECUTOR = "executor";
String CREATED_AT = "created_at";
String UPDATED_AT = "updated_at";
@AllArgsConstructor
@Getter
enum Status {
UNEXECUTED(0),
SUCCEEDED(10),
FAILED(-1),
;
private final int code;
}
}
src/main/java/org/example/mycamelapp/db/TaskSql.java
public class TaskSql implements TaskTable {
public static String insertTask() {
return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
}
public static String selectTasks(Status status, int limit) {
return "sql:SELECT * FROM " + TABLE_NAME
+ " WHERE " + STATUS + " = " + status.getCode()
+ " LIMIT " + limit
+ "?useIterator=false" // List<Map<String, Object>>
;
}
public static String updateTask(Status nextStatus, String hostname) {
return "sql:UPDATE " + TABLE_NAME
+ " SET " + STATUS + " = " + nextStatus.getCode()
+ ", " + EXECUTOR + " = " + quote(hostname)
+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
;
}
private static String quote(String value) {
if (value == null) return "NULL";
return "'" + value + "'";
}
private static String ref(String key) {
return ":#" + key;
}
}
Die für "from ()" und "to ()" angegebene Zeichenfolge ist eine URI, und Optionen können nach "?" Angegeben werden. Dieses Mal wird useIterator = false
verwendet und anstatt die von der DB erfassten Daten Zeile für Zeile mit Map <>
an die Route zu senden, wird sie zusammen mitList <Map <>>
gesendet.
Die Zeichenfolge : # task_id
(←ref (TASK_ID)
) wird in der Abfrage angezeigt. Auf diese Weise kann das Kamel den dem Schlüssel entsprechenden Wert aus dem Text oder der Kopfzeile des Austauschs abrufen und einbetten.
Process Legen Sie nach dem Zufallsprinzip 1 bis 3 Sekunden lang einen Schlaf an, unter der Annahme, dass "es lange dauert".
src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java
@Slf4j
public class PseudoWorkProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> task = exchange.getIn().getBody(Map.class);
int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";
log.info("start working :: " + infoMsg);
Thread.sleep(processingTime);
log.info("finish working :: " + infoMsg);
}
}
Dies wird jedes Mal aufgerufen, wenn Daten eingehen, sodass Sie Haltepunkte festlegen und Fehler beheben können.
Fügen Sie MySQL Verbindungseinstellungen hinzu. Obwohl es hier in yaml geschrieben ist, kann es durch die Umgebungsvariable oder die Option -D angegeben werden (Priorität wird durch Spring Boot festgelegt. 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). Das Schreiben war lange Zeit mühsam, daher habe ich auch den Benutzernamen und das Passwort in die URL eingefügt.
src/main/resources/application.yml
# to keep the JVM running
camel:
springboot:
main-run-controller: true
spring:
data-source:
url: jdbc:mysql://user:password@localhost:3306/my_camel_app
Der Startpunkt der App ist der gleiche wie beim letzten Mal.
src/main/java/org/example/mycamelapp/MyCamelApplication.java
@SpringBootApplication
public class MyCamelApplication {
public static void main(String[] args) {
SpringApplication.run(MyCamelApplication.class, args);
}
}
terminal
cd path/to/app
mvn spring-boot:run
Recommended Posts