[JAVA] Beispiel für Batch-Prozessdaten in der Datenbank mit Apache Camel Spring Boot-Startern

Implementieren Sie in der Fortsetzung von "[Verfahren zum Erstellen einer einfachen Kamel-App mit Apache Camel Spring Boot-Startern](/ HMMNRST / items / db9cb387ecfde0944496) ** Batch, um in MySQL gespeicherte Aufgaben zu verarbeiten **.

Eigentlich besteht der Zweck darin, bei der parallelen Verarbeitung auf mehreren Computern mit exklusiver Kontrolle zu experimentieren, aber das kann ich noch nicht, da ich SQL, Camel und Java auf halbem Weg verstehe. Diese App ist als Beispiel positioniert, das dazu zu führen scheint.


Ich werde den letzten Teil nicht erklären, aber ich werde alle notwendigen Dateien in diesem Artikel schreiben, damit ich sie verschieben kann, ohne zurückzublicken.

Dateiliste


path/to/app/
  ├── src/main/
  │     ├── java/org/example/mycamelapp/
  │     │     ├── db/
  │     │     │     ├── TaskSql.java
  │     │     │     └── TaskTable.java
  │     │     ├── processors/
  │     │     │     └── PseudoWorkProcessor.java
  │     │     ├── routes/
  │     │     │     └── TaskExecutionRoute.java
  │     │     └── MyCamelApplication.java
  │     └── resources/
  │           └── application.yml
  └── pom.xml

Umgebung

Wie beim letzten Mal, außer dass die Datenbank hinzugefügt wurde.

DB-Tabellendefinition

Erstellen Sie eine Task-Tabelle. Die folgenden zwei Spalten basieren auf dem Bild der "Aufgabenverarbeitung".

Die Annahme der exklusiven Kontrolle, mit der ich experimentieren möchte, ist, dass verschiedene Aufgaben parallel verarbeitet werden können, ohne sich um die Reihenfolge zu kümmern, aber dieselbe Aufgabe sollte nicht zweimal ausgeführt werden.

CREATE DATABASE IF NOT EXISTS my_camel_app;

CREATE TABLE IF NOT EXISTS my_camel_app.task (
	task_id    int(11)      NOT NULL AUTO_INCREMENT,
	status     int(11)      NOT NULL DEFAULT 0,
	executor   varchar(255)          DEFAULT NULL,
	created_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
	updated_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (task_id)
);

Sie können Datensätze wie "INSERT INTO my_camel_app.task () VALUES ();" hinzufügen. Möglicherweise kann es mit dem vorherigen Timer automatisch erhöht werden.

pom.xml

Volltext (zum Vergrößern anklicken)

pom.xml (mycamelapp)


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>mycamelapp</artifactId>
	<version>1.0-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
	</parent>

	<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
	<dependencyManagement>
		<dependencies>
			<!-- Camel BOM -->
			<dependency>
				<groupId>org.apache.camel.springboot</groupId>
				<artifactId>camel-spring-boot-dependencies</artifactId>
				<version>3.2.0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<!-- ... other BOMs or dependencies ... -->
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<!-- Camel Starter -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-sql-starter</artifactId>
		</dependency>
		<!-- ... other dependencies ... -->

		<!-- JDBC -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>

		<!-- Utils -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

Wechselpunkt:

Es ist praktisch, dass Sie keine neue Version von dem schreiben müssen, was in dependencyManagement (einschließlich des übergeordneten Projekts) geschrieben ist. Es gab auch MySQL-Connector-Java und Lombok.

Implementierung der Aufgabenverarbeitung

Erstellen wir eine Route, die "mehrere Aufgaben gleichzeitig von der Datenbank erfasst, parallel verarbeitet und die Datenbank aktualisiert".

camel-sample-route.png

Der Status der zu verarbeitenden Aufgabe lautet "UNEXECUTED" → "SUCCEEDED". (Weisen Sie später jedem Status die entsprechenden Nummern zu.)

Route Ich werde die detaillierten Einstellungen und die Verarbeitung separat definieren und zunächst nur die im Bild gezeigte Struktur erstellen. (Wenn Sie den Code kopieren, müssen Sie andere Klassen importieren, damit die letzte gut ist.)

src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java


@Component
public class TaskExecutionRoute extends RouteBuilder {

	// dummy hostname
	static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());

	@Override
	public void configure() throws Exception {
		from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
				// exchange.getIn().getBody() is List<Map<String, Object>> of records
				.log("${body.size} rows selected.")
				.split(body()).parallelProcessing()
					// exchange.getIn().getBody() is Map<String, Object> of a record
					.process(new PseudoWorkProcessor())
					.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
				.end()
				.log("all tasks finished.")
		;
	}
}

Selbst wenn Sie bei Java DSL vergessen, "end ()" einzugeben, wird selten ein Syntaxfehler angezeigt, und es besteht das Risiko, dass sich die Routenstruktur von Ihrer Vorstellung unterscheidet.

Wie die Namen "RouteBuilder" und "configure ()" anzeigen, wird diese Methode nicht jedes Mal aufgerufen, wenn Daten eingehen, sondern nur einmal, wenn die App gestartet wird. Selbst wenn ein Haltepunkt festgelegt ist, kann die Datenverarbeitung daher nicht debuggt werden.

Tabellen und Abfragen

src/main/java/org/example/mycamelapp/db/TaskTable.java


public interface TaskTable {

	String TABLE_NAME = "task";

	String TASK_ID = "task_id";
	String STATUS = "status";
	String EXECUTOR = "executor";
	String CREATED_AT = "created_at";
	String UPDATED_AT = "updated_at";

	@AllArgsConstructor
	@Getter
	enum Status {

		UNEXECUTED(0),
		SUCCEEDED(10),
		FAILED(-1),
		;

		private final int code;
	}
}

src/main/java/org/example/mycamelapp/db/TaskSql.java


public class TaskSql implements TaskTable {

	public static String insertTask() {
		return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
	}

	public static String selectTasks(Status status, int limit) {
		return "sql:SELECT * FROM " + TABLE_NAME
				+ " WHERE " + STATUS + " = " + status.getCode()
				+ " LIMIT " + limit
				+ "?useIterator=false" // List<Map<String, Object>>
				;
	}

	public static String updateTask(Status nextStatus, String hostname) {
		return "sql:UPDATE " + TABLE_NAME
				+ " SET " + STATUS + " = " + nextStatus.getCode()
				+ ", " + EXECUTOR + " = " + quote(hostname)
				+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
				;
	}

	private static String quote(String value) {
		if (value == null) return "NULL";
		return "'" + value + "'";
	}

	private static String ref(String key) {
		return ":#" + key;
	}
}

Die für "from ()" und "to ()" angegebene Zeichenfolge ist eine URI, und Optionen können nach "?" Angegeben werden. Dieses Mal wird useIterator = false verwendet und anstatt die von der DB erfassten Daten Zeile für Zeile mit Map <> an die Route zu senden, wird sie zusammen mitList <Map <>>gesendet.

Die Zeichenfolge : # task_id (←ref (TASK_ID)) wird in der Abfrage angezeigt. Auf diese Weise kann das Kamel den dem Schlüssel entsprechenden Wert aus dem Text oder der Kopfzeile des Austauschs abrufen und einbetten.

Process Legen Sie nach dem Zufallsprinzip 1 bis 3 Sekunden lang einen Schlaf an, unter der Annahme, dass "es lange dauert".

src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java


@Slf4j
public class PseudoWorkProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		Map<String, Object> task = exchange.getIn().getBody(Map.class);
		int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
		String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";

		log.info("start  working :: " + infoMsg);
		Thread.sleep(processingTime);
		log.info("finish working :: " + infoMsg);
	}
}

Dies wird jedes Mal aufgerufen, wenn Daten eingehen, sodass Sie Haltepunkte festlegen und Fehler beheben können.

Anlaufen

Aufbau

Fügen Sie MySQL Verbindungseinstellungen hinzu. Obwohl es hier in yaml geschrieben ist, kann es durch die Umgebungsvariable oder die Option -D angegeben werden (Priorität wird durch Spring Boot festgelegt. 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). Das Schreiben war lange Zeit mühsam, daher habe ich auch den Benutzernamen und das Passwort in die URL eingefügt.

src/main/resources/application.yml


# to keep the JVM running
camel:
  springboot:
    main-run-controller: true

spring:
  data-source:
    url: jdbc:mysql://user:password@localhost:3306/my_camel_app

Der Startpunkt der App ist der gleiche wie beim letzten Mal.

src/main/java/org/example/mycamelapp/MyCamelApplication.java


@SpringBootApplication
public class MyCamelApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyCamelApplication.class, args);
	}
}

Lauf

terminal


cd path/to/app

mvn spring-boot:run

Referenz

Recommended Posts

Beispiel für Batch-Prozessdaten in der Datenbank mit Apache Camel Spring Boot-Startern
Schritte zum Erstellen einer einfachen Kamel-App mit Apache Camel Spring Boot-Startern
Hallo Welt (REST API) mit Apache Camel + Spring Boot 2
Hallo Welt (Konsolen-App) mit Apache Camel + Spring Boot 2
Fluss bis zur Ausgabe von Tabellendaten, die mit Spring Boot angezeigt werden sollen
So führen Sie UT mit Excel als Testdaten mit Spring Boot + JUnit5 + DBUnit durch
Beispielcode für die DB-Steuerung durch deklarative Transaktion mit Spring Boot + Spring Data JPA
Bis zur Datenerfassung mit Spring Boot + MyBatis + PostgreSQL
Verwendung von MyBatis2 (iBatis) mit Spring Boot 1.4 (Spring 4)
Verwendung des eingebauten h2db mit Federstiefel
Versuchen Sie, die Anmeldefunktion mit Spring Boot zu implementieren
Aktualisieren Sie die Datenbank regelmäßig mit Spring Batch und My Batis
Einführung in Spring Boot + In-Memory Data Grid
Versuchen Sie, die Migration mit Spring Boot Flyway zu automatisieren
Ich wollte Spring Boot in einem Multiprojekt gradle
[Einführung in Spring Boot] Authentifizierungsfunktion mit Spring Security
[Spring Batch] Gibt Tabellendaten in eine CSV-Datei aus
Beachten Sie, dass ich süchtig nach Stapelverarbeitung mit Spring Boot war
Einstellungen für die Verbindung zu MySQL mit Spring Boot + Spring JDBC
Ordnen Sie DTO automatisch Entitäten mit der Spring Boot-API zu
Ich habe versucht, mit Spring Data JPA zu beginnen
Verwendung von CommandLineRunner im Spring Batch von Spring Boot
Stellen Sie das Spring Boot-Projekt in XAMPP für Tomcat bereit
Booten nach Umgebung mit Spring Boot of Maven
Versuch, SSR Vue.js mit Spring Boot und GraalJS zu verwenden
Mit Spring Boot herunterladen
Ich habe das Spring Boot-Einführungshandbuch [Zugriff auf Daten mit JPA] ausprobiert.
Schneiden Sie SQL in die Eigenschaftendatei mit jdbcTemplate von Spring Boot aus
Stellen Sie mit spring boot + spring jpa eine Verbindung zur Datenbank her und führen Sie die CRUD-Operation durch
Spring Boot-Anwendung, die DB-Verbindungseinstellungen mit Parametern angibt
Ich habe versucht, mit Swagger mit Spring Boot zu beginnen
8 Dinge, die mit Spring Boot und JPA in die DB eingefügt werden müssen
[Java] Beispielprojekt zum Entwickeln von Webanwendungen mit Spring Boot