This entry describes transaction management in the Apache Camel framework.
To explain a little about what a transaction is in the first place, it can be said that multiple processes are combined into one (I think it is needless to say). This multiple process is inseparable and results in either processed or unprocessed. This is the Atomicity of the ACID properties of well-known transactions.
--Atomicity: All processing is completely performed or none is performed. --Consistency: There is no inconsistency in the data regardless of the transaction end state (consistency). --Isolation: If multiple transactions have been executed, do not interfere with other transactions. --Durability: The result of successfully completed transaction processing is not lost.
If there is an error while executing multiple processes, it returns to the state before transaction execution. This is called rollback. If multiple processes succeed without any error, the processing result will be reflected by committing.
Transactions handled by Apache Camel are JDBC or JMS, but here we will explain for JDBC. Camel is based on Spring transaction management, so if you have worked with transactions in Spring, you should be able to understand Camel transactions easily.
Spring transactions are managed by what is called a Transaction Manager. There are several transaction managers available. For JDBC, use "org.springframework.jdbc.datasource.DataSourceTransactionManager". There are other transaction managers for JTA, JMS and Hibername (although I haven't used them)
Transactions are managed in Camel using Spring's transaction manager as well. Now, let's explain Camel's transaction management for JDBC.
I will explain using a simple sample program that handles transactions in Camel. In the sample program, I will use Camel's SQL component to access the DB.
First, define the data source managed by the transaction manager. The target DB is PostgreSQL, which is a data source that uses HikariCP for connection pooling.
<bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
<property name="driverClassName" value="org.postgresql.Driver" />
<property name="username" value="postgres" />
<property name="password" value="postgres" />
<property name="autoCommit" value="false" />
</bean>
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="close">
<constructor-arg ref="hikariConfig" />
</bean>
Defines a transaction manager that manages the defined data source. The data source created earlier in (1) is specified in the property.
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
First, you can't try without a table, so create a simple table (sample_test).
create table sample_test (
id varchar(10) not null primary key,
name varchar(255) not null,
count integer not null,
insert_date timestamp not null);
Use SQL components to execute SQL into the database. Specify and define the data source that uses the SQL component as shown in (1) below.
<bean id="sqlComponent"
class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
You can also specify the data source to use in the endpoint's URI, but specifying it in advance makes the URI simpler.
When executing SQL, specify the ID of the SQL component specified earlier and the DML to be executed in the URI of the endpoint.
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
In SQL,: # id,: # name,: # count are parameters, and if you set the property with the same name in the header of the message, it will be automatically bound. Although SQL is written directly to the endpoint, it is better to read it from the property file.
Now that the SQL components are ready, let's create a simple sample program.
In the sample program, one record is inserted into the sample_test table and that record is updated.
Transactions are placed under the control of the transaction manager by writing "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
In the previous example, the result is the same with or without the transaction manager, so let's try an example where an exception is raised after inserting one record and it is rolled back.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
ThrowException (1) forcibly raises an exception. This rolls back the INSERT statement that was executed before it. Also, since an exception has occurred, the route ends without executing the subsequent UPDATE statement. The run-time log is output as follows.
[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[main_route ] [main_route ] [timer://trigger?repeatCount=1 ] [ 80]
[main_route ] [transacted1 ] [transacted ] [ 0]
[main_route ] [setHeader1 ] [setHeader[id] ] [ 0]
[main_route ] [setHeader2 ] [setHeader[name] ] [ 0]
[main_route ] [setHeader3 ] [setHeader[count] ] [ 1]
[main_route ] [to1 ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [ 34]
[main_route ] [to2 ] [log:insertLog?showHeaders=true ] [ 6]
[main_route ] [throwException1 ] [throwException[ref:null] ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~ Omitted ~
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception
In the last line, you can see that the thread of "TransactionErrorHandler" is outputting the log "Transaction rollback" and it was rolled back. No records have been inserted by rollback for the data on the actual table.
In the previous example, multiple processes (DML) were defined for one route, the transaction started when the route started, and the transaction ended when the route ended.
When there are multiple routes, you can specify the treatment such as using the same transaction between routes or creating another transaction. This can be set with the transaction propagation (PROPAGATION) option. Transaction propagation (PROPAGATION) uses Spring transactions rather than Apache Camel's own specifications.
The transaction propagation (PROPAGATION) option takes the following settings and behaves differently with and without transactions.
Transaction propagation attribute | If there is a transaction | If there is no transaction |
---|---|---|
PROPAGATION_REQUIRED | Execute within an existing transaction. | Start a new transaction. |
PROPAGATION_REQUIRES_NEW | Start a new transaction separately from the existing transaction. | Start a new transaction. |
PROPAGATION_MANDATORY | Execute within an existing transaction. | Throw an exception. |
PROPAGATION_SUPPORTS | Execute within an existing transaction. | Run without a transaction. |
PROPAGATION_NOT_SUPPORTED | Stop an existing transaction and run it without a transaction. | Run without a transaction. |
PROPAGATION_NEVER | Throw an exception. | Run without a transaction. |
PROPAGATION_NESTED | An existing transaction is used, and only that part is processed like a nested transaction. | Start a new transaction. |
When using PROPAGATION in Camel, define the transaction propagation to use as follows: It was not used in the example before, but if it is not specified, PROPAGATION_REQUIRED is specified by default.
The following defines the propagation of three transactions: PROPAGATION_REQUIRED (1), PROPAGATION_REQUIRES_NEW (2), and PROPAGATION_MANDATORY (3).
<bean id="PROPAGATION_REQUIRED"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRED" /><!-- (1) -->
</bean>
<bean id="PROPAGATION_REQUIRES_NEW"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
</bean>
<bean id="PROPAGATION_MANDATORY"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_MANDATORY" /><!-- (3) -->
</bean>
To use the defined transaction propagation, describe as "ref =" PROPAGATION_REQUIRED "" (1) in transacted. The following example defines three routes, main_route, tran1_route, and tran2_route, each of which specifies the transaction propagation to be used for translated.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
<to uri="direct:tran1" />
<to uri="direct:tran2" />
</route>
<route id="tran1_route">
<from uri="direct:tran1" />
<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
<route id="tran2_route">
<from uri="direct:tran2" />
<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
<setHeader headerName="id"><constant>id002</constant></setHeader>
<setHeader headerName="name"><constant>testuser2</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
Also, in Spring, by default, if an unchecked exception (RuntimeException and its subclass) occurs, it will be rolled back, but if a checked exception occurs, it will be committed without being rolled back. On the other hand, Camel also rolls back with unchecked exceptions (and Exceptions).
Camel automatically rolls back when an exception (unchecked exception) occurs, but you can also explicitly roll it back.
To explicitly rollback, specify "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<rollback markRollbackOnlyLast="true" />
</route>
I didn't explain it this time, but you can also handle transactions that target multiple resources such as multiple JMS and JDBC. This is called a global transaction. This time we are only dealing with JDBC, which is called a local transaction as opposed to a global transaction.
-SQL Component (official site)
-Transactional Client (official site)
-TransactionErrorHandler (official site)
-Transaction management with Spring
-Understanding how to share Spring DB connection (DB transaction)
-Spring Framework 6. Transaction Management by TECHSCORE
--Write an example of Java DSL. --Since the rollback example is simple, change it to a more complicated and meaningful example. --Since there are few explanations of SQL components, the explanation is supplemented. But since this article is about transactions, I'll consider whether to write another article focusing on SQL components. --Include the transaction where FROM is JMS and TO is JDBC. (If JDBC fails, JMS will not be committed and will be resent)
Recommended Posts