In the continuation of "[Procedure for creating a simple camel app using Apache Camel Spring Boot starters](/ HMMNRST / items / db9cb387ecfde0944496)", implement ** Batch to process tasks stored in MySQL **.
Actually, the purpose is to ** experiment exclusive control ** when processing in parallel on multiple machines, but I can't do that yet because I have a half-hearted understanding of SQL, camel, and Java. This app is positioned as a sample that seems to lead to that.
I won't explain the last part, but I will write all the necessary files in this article so I can move them without looking back.
File list
path/to/app/
├── src/main/
│ ├── java/org/example/mycamelapp/
│ │ ├── db/
│ │ │ ├── TaskSql.java
│ │ │ └── TaskTable.java
│ │ ├── processors/
│ │ │ └── PseudoWorkProcessor.java
│ │ ├── routes/
│ │ │ └── TaskExecutionRoute.java
│ │ └── MyCamelApplication.java
│ └── resources/
│ └── application.yml
└── pom.xml
Same as last time except that DB was added.
Create a task
table. The following two columns have the image of "task processing".
status
: Indicates the processing stage of the task. For each batch (only one this time), pick out the tasks that you are in charge of, and advance the status when completed.The assumption of exclusive control that I want to experiment with is that different tasks can be processed in parallel without worrying about the order, but the same task should not be executed twice.
CREATE DATABASE IF NOT EXISTS my_camel_app;
CREATE TABLE IF NOT EXISTS my_camel_app.task (
task_id int(11) NOT NULL AUTO_INCREMENT,
status int(11) NOT NULL DEFAULT 0,
executor varchar(255) DEFAULT NULL,
created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (task_id)
);
You can add records like ʻINSERT INTO my_camel_app.task () VALUES (); `. It may be possible to increase it automatically using the previous timer.
pom.xml
pom.xml (mycamelapp)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mycamelapp</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
<dependencyManagement>
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- ... other BOMs or dependencies ... -->
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Camel Starter -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
</dependency>
<!-- ... other dependencies ... -->
<!-- JDBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Utils -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
change point:
2.2.6.RELEASE
is for the camel versionIt is convenient that you do not need to write a new version of what is written in dependencyManagement (including the parent project). There was also mysql-connector-java and lombok.
Let's create a route that "acquires multiple tasks from the DB at once, processes them in parallel, and updates the DB".
The status of the task to be processed is ʻUNEXECUTED→
SUCCEEDED`. (Assign numbers to each status later)
Route I will define the detailed settings and processing separately, and first create only the structure as shown in the picture. (If you copy the code, you need to import other classes, so the last one is good)
src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java
@Component
public class TaskExecutionRoute extends RouteBuilder {
// dummy hostname
static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());
@Override
public void configure() throws Exception {
from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
// exchange.getIn().getBody() is List<Map<String, Object>> of records
.log("${body.size} rows selected.")
.split(body()).parallelProcessing()
// exchange.getIn().getBody() is Map<String, Object> of a record
.process(new PseudoWorkProcessor())
.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
.end()
.log("all tasks finished.")
;
}
}
from ()
as a URI string.$ {expr}
in a string.split ()
and process each.parallelProcessing ()
to process in parallel.process ()
.to ()
as a URI string.In Java DSL, even if you forget to enter ʻend () `, a syntax error rarely occurs, and there is a risk that the route structure will be different from what you imagined.
As the names RouteBuilder
andconfigure ()
indicate, this method is not called every time data comes in, but only once when the app starts. Therefore, even if a breakpoint is set, data processing cannot be debugged.
src/main/java/org/example/mycamelapp/db/TaskTable.java
public interface TaskTable {
String TABLE_NAME = "task";
String TASK_ID = "task_id";
String STATUS = "status";
String EXECUTOR = "executor";
String CREATED_AT = "created_at";
String UPDATED_AT = "updated_at";
@AllArgsConstructor
@Getter
enum Status {
UNEXECUTED(0),
SUCCEEDED(10),
FAILED(-1),
;
private final int code;
}
}
src/main/java/org/example/mycamelapp/db/TaskSql.java
public class TaskSql implements TaskTable {
public static String insertTask() {
return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
}
public static String selectTasks(Status status, int limit) {
return "sql:SELECT * FROM " + TABLE_NAME
+ " WHERE " + STATUS + " = " + status.getCode()
+ " LIMIT " + limit
+ "?useIterator=false" // List<Map<String, Object>>
;
}
public static String updateTask(Status nextStatus, String hostname) {
return "sql:UPDATE " + TABLE_NAME
+ " SET " + STATUS + " = " + nextStatus.getCode()
+ ", " + EXECUTOR + " = " + quote(hostname)
+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
;
}
private static String quote(String value) {
if (value == null) return "NULL";
return "'" + value + "'";
}
private static String ref(String key) {
return ":#" + key;
}
}
The character string specified for from ()
and to ()
is a URI, and options can be specified after ?
. This time, ʻuseIterator = falseis used, and instead of sending the data acquired from the DB to the route line by line with
Map <>, it is sent together with
List <Map <>>`.
The string : # task_id
(←ref (TASK_ID)
) appears in the query. This is because camel retrieves the value corresponding to the key from the body or header in the exchange and embeds it.
Process Randomly put a sleep for 1 to 3 seconds on the assumption that "it takes a long time".
src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java
@Slf4j
public class PseudoWorkProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> task = exchange.getIn().getBody(Map.class);
int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";
log.info("start working :: " + infoMsg);
Thread.sleep(processingTime);
log.info("finish working :: " + infoMsg);
}
}
This is called every time data comes in, so you can set breakpoints and debug.
Add connection settings to MySQL. Although it is written in yaml here, it can be given by environment variable or -D
option (Priority is decided by Spring Boot 2.2.6.RELEASE / reference / html / spring-boot-features.html # boot-features-external-config)). It was troublesome to write for a long time, so I also put the user name and password in the URL.
src/main/resources/application.yml
# to keep the JVM running
camel:
springboot:
main-run-controller: true
spring:
data-source:
url: jdbc:mysql://user:password@localhost:3306/my_camel_app
The starting point of the app is the same as last time.
src/main/java/org/example/mycamelapp/MyCamelApplication.java
@SpringBootApplication
public class MyCamelApplication {
public static void main(String[] args) {
SpringApplication.run(MyCamelApplication.class, args);
}
}
terminal
cd path/to/app
mvn spring-boot:run
Recommended Posts