[JAVA] Sample to batch process data on DB with Apache Camel Spring Boot starters

In the continuation of "[Procedure for creating a simple camel app using Apache Camel Spring Boot starters](/ HMMNRST / items / db9cb387ecfde0944496)", implement ** Batch to process tasks stored in MySQL **.

Actually, the purpose is to ** experiment exclusive control ** when processing in parallel on multiple machines, but I can't do that yet because I have a half-hearted understanding of SQL, camel, and Java. This app is positioned as a sample that seems to lead to that.


I won't explain the last part, but I will write all the necessary files in this article so I can move them without looking back.

File list


path/to/app/
  ├── src/main/
  │     ├── java/org/example/mycamelapp/
  │     │     ├── db/
  │     │     │     ├── TaskSql.java
  │     │     │     └── TaskTable.java
  │     │     ├── processors/
  │     │     │     └── PseudoWorkProcessor.java
  │     │     ├── routes/
  │     │     │     └── TaskExecutionRoute.java
  │     │     └── MyCamelApplication.java
  │     └── resources/
  │           └── application.yml
  └── pom.xml

environment

Same as last time except that DB was added.

DB table definition

Create a task table. The following two columns have the image of "task processing".

The assumption of exclusive control that I want to experiment with is that different tasks can be processed in parallel without worrying about the order, but the same task should not be executed twice.

CREATE DATABASE IF NOT EXISTS my_camel_app;

CREATE TABLE IF NOT EXISTS my_camel_app.task (
	task_id    int(11)      NOT NULL AUTO_INCREMENT,
	status     int(11)      NOT NULL DEFAULT 0,
	executor   varchar(255)          DEFAULT NULL,
	created_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
	updated_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (task_id)
);

You can add records like ʻINSERT INTO my_camel_app.task () VALUES (); `. It may be possible to increase it automatically using the previous timer.

pom.xml

Full text (click to expand)

pom.xml (mycamelapp)


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>mycamelapp</artifactId>
	<version>1.0-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
	</parent>

	<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
	<dependencyManagement>
		<dependencies>
			<!-- Camel BOM -->
			<dependency>
				<groupId>org.apache.camel.springboot</groupId>
				<artifactId>camel-spring-boot-dependencies</artifactId>
				<version>3.2.0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<!-- ... other BOMs or dependencies ... -->
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<!-- Camel Starter -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-sql-starter</artifactId>
		</dependency>
		<!-- ... other dependencies ... -->

		<!-- JDBC -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>

		<!-- Utils -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

change point:

It is convenient that you do not need to write a new version of what is written in dependencyManagement (including the parent project). There was also mysql-connector-java and lombok.

Implementation of task processing

Let's create a route that "acquires multiple tasks from the DB at once, processes them in parallel, and updates the DB".

camel-sample-route.png

The status of the task to be processed is ʻUNEXECUTEDSUCCEEDED`. (Assign numbers to each status later)

Route I will define the detailed settings and processing separately, and first create only the structure as shown in the picture. (If you copy the code, you need to import other classes, so the last one is good)

src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java


@Component
public class TaskExecutionRoute extends RouteBuilder {

	// dummy hostname
	static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());

	@Override
	public void configure() throws Exception {
		from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
				// exchange.getIn().getBody() is List<Map<String, Object>> of records
				.log("${body.size} rows selected.")
				.split(body()).parallelProcessing()
					// exchange.getIn().getBody() is Map<String, Object> of a record
					.process(new PseudoWorkProcessor())
					.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
				.end()
				.log("all tasks finished.")
		;
	}
}

In Java DSL, even if you forget to enter ʻend () `, a syntax error rarely occurs, and there is a risk that the route structure will be different from what you imagined.

As the names RouteBuilder andconfigure ()indicate, this method is not called every time data comes in, but only once when the app starts. Therefore, even if a breakpoint is set, data processing cannot be debugged.

Tables and queries

src/main/java/org/example/mycamelapp/db/TaskTable.java


public interface TaskTable {

	String TABLE_NAME = "task";

	String TASK_ID = "task_id";
	String STATUS = "status";
	String EXECUTOR = "executor";
	String CREATED_AT = "created_at";
	String UPDATED_AT = "updated_at";

	@AllArgsConstructor
	@Getter
	enum Status {

		UNEXECUTED(0),
		SUCCEEDED(10),
		FAILED(-1),
		;

		private final int code;
	}
}

src/main/java/org/example/mycamelapp/db/TaskSql.java


public class TaskSql implements TaskTable {

	public static String insertTask() {
		return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
	}

	public static String selectTasks(Status status, int limit) {
		return "sql:SELECT * FROM " + TABLE_NAME
				+ " WHERE " + STATUS + " = " + status.getCode()
				+ " LIMIT " + limit
				+ "?useIterator=false" // List<Map<String, Object>>
				;
	}

	public static String updateTask(Status nextStatus, String hostname) {
		return "sql:UPDATE " + TABLE_NAME
				+ " SET " + STATUS + " = " + nextStatus.getCode()
				+ ", " + EXECUTOR + " = " + quote(hostname)
				+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
				;
	}

	private static String quote(String value) {
		if (value == null) return "NULL";
		return "'" + value + "'";
	}

	private static String ref(String key) {
		return ":#" + key;
	}
}

The character string specified for from () and to () is a URI, and options can be specified after ?. This time, ʻuseIterator = falseis used, and instead of sending the data acquired from the DB to the route line by line withMap <>, it is sent together with List <Map <>>`.

The string : # task_id (←ref (TASK_ID)) appears in the query. This is because camel retrieves the value corresponding to the key from the body or header in the exchange and embeds it.

Process Randomly put a sleep for 1 to 3 seconds on the assumption that "it takes a long time".

src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java


@Slf4j
public class PseudoWorkProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		Map<String, Object> task = exchange.getIn().getBody(Map.class);
		int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
		String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";

		log.info("start  working :: " + infoMsg);
		Thread.sleep(processingTime);
		log.info("finish working :: " + infoMsg);
	}
}

This is called every time data comes in, so you can set breakpoints and debug.

Start-up

Setting

Add connection settings to MySQL. Although it is written in yaml here, it can be given by environment variable or -D option (Priority is decided by Spring Boot 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). It was troublesome to write for a long time, so I also put the user name and password in the URL.

src/main/resources/application.yml


# to keep the JVM running
camel:
  springboot:
    main-run-controller: true

spring:
  data-source:
    url: jdbc:mysql://user:password@localhost:3306/my_camel_app

The starting point of the app is the same as last time.

src/main/java/org/example/mycamelapp/MyCamelApplication.java


@SpringBootApplication
public class MyCamelApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyCamelApplication.class, args);
	}
}

Run

terminal


cd path/to/app

mvn spring-boot:run

reference

Recommended Posts

Sample to batch process data on DB with Apache Camel Spring Boot starters
Steps to create a simple camel app using Apache Camel Spring Boot starters
Hello World (REST API) with Apache Camel + Spring Boot 2
Hello World (console app) with Apache Camel + Spring Boot 2
Flow until output table data to view with Spring Boot
How to perform UT with Excel as test data with Spring Boot + JUnit5 + DBUnit
Sample code for DB control by declarative transaction in Spring Boot + Spring Data JPA
Until data acquisition with Spring Boot + MyBatis + PostgreSQL
How to use MyBatis2 (iBatis) with Spring Boot 1.4 (Spring 4)
How to use built-in h2db with spring boot
Try to implement login function with Spring Boot
Periodically update DB with Spring Batch and MyBatis
An introduction to Spring Boot + in-memory data grid
Try to automate migration with Spring Boot Flyway
I wanted to gradle spring boot with multi-project
[Introduction to Spring Boot] Authentication function with Spring Security
[Spring Batch] Output table data to CSV file
A memo that I was addicted to when making batch processing with Spring Boot
Form and process file and String data at the same time with Spring Boot + Java
Settings for connecting to MySQL with Spring Boot + Spring JDBC
Automatically map DTOs to entities with Spring Boot API
I tried to get started with Spring Data JPA
How to use CommandLineRunner in Spring Batch of Spring Boot
Deploy the Spring Boot project to Tomcat on XAMPP
How to boot by environment with Spring Boot of Maven
Attempt to SSR Vue.js with Spring Boot and GraalJS
Download with Spring Boot
Spring Boot Introductory Guide I tried [Accessing Data with JPA]
Extract SQL to property file with jdbcTemplate of spring boot
Connect to database with spring boot + spring jpa and CRUD operation
Spring Boot application that specifies DB connection settings with parameters
I tried to get started with Swagger using Spring Boot
8 things to insert into DB using Spring Boot and JPA
[Java] Sample project for developing web applications with Spring Boot