In diesem Eintrag erklären wir das Transaktionsmanagement im Apache Camel-Framework.
Um ein wenig darüber zu erklären, was eine Transaktion überhaupt ist, kann gesagt werden, dass mehrere Prozesse zu einem zusammengefasst werden (ich denke, es ist unnötig zu sagen). Dieser Mehrfachprozess ist untrennbar miteinander verbunden und führt entweder zu einer Verarbeitung oder zu einer Nichtverarbeitung. Dies ist die Atomizität der ACID-Eigenschaft der bekannten Transaktion.
--Atomizität: Die gesamte Verarbeitung wird vollständig oder gar nicht ausgeführt. --Konsistenz: Es gibt keine Inkonsistenz in den Daten, unabhängig vom Endzustand der Transaktion (Konsistenz). --Isolation: Wenn mehrere Transaktionen ausgeführt werden, stören Sie andere Transaktionen nicht.
Wenn beim Ausführen mehrerer Prozesse ein Fehler auftritt, wird der Status vor der Ausführung der Transaktion wiederhergestellt. Dies wird als Rollback bezeichnet. Wenn mehrere Prozesse fehlerfrei erfolgreich sind, wird das Verarbeitungsergebnis durch Festschreiben wiedergegeben.
Von Apache Camel abgewickelte Transaktionen sind JDBC oder JMS, aber hier werden wir für JDBC erklären. Camel basiert auf dem Transaktionsmanagement von Spring. Wenn Sie also mit Spring gearbeitet haben, sind die Transaktionen von Camel leicht zu verstehen.
Spring-Transaktionen werden von einem sogenannten Transaction Manager verwaltet. Es stehen mehrere Transaktionsmanager zur Verfügung. Verwenden Sie im Fall von JDBC "org.springframework.jdbc.datasource.DataSourceTransactionManager". Es gibt andere Transaktionsmanager für JTA, JMS und Hibername (obwohl ich sie nicht verwendet habe).
Ebenso verwaltet Camel Transaktionen mit dem Transaktionsmanager von Spring. Lassen Sie uns nun das Transaktionsmanagement von Camel für JDBC erläutern.
Ich werde anhand eines einfachen Beispielprogramms erklären, das Transaktionen in Camel abwickelt. Das Beispielprogramm verwendet die SQL-Komponente von Camel, um auf die Datenbank zuzugreifen.
Definieren Sie zunächst die vom Transaktionsmanager verwaltete Datenquelle. Die Ziel-DB ist PostgreSQL, eine Datenquelle, die HikariCP für das Verbindungspooling verwendet.
<bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
<property name="driverClassName" value="org.postgresql.Driver" />
<property name="username" value="postgres" />
<property name="password" value="postgres" />
<property name="autoCommit" value="false" />
</bean>
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="close">
<constructor-arg ref="hikariConfig" />
</bean>
Definiert einen Transaktionsmanager, der die definierte Datenquelle verwaltet. Die zuvor in (1) erstellte Datenquelle wird in der Eigenschaft angegeben.
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
Erstens können Sie es nicht ohne Tabelle versuchen. Erstellen Sie daher eine einfache Tabelle (sample_test).
create table sample_test (
id varchar(10) not null primary key,
name varchar(255) not null,
count integer not null,
insert_date timestamp not null);
Verwenden Sie SQL-Komponenten, um SQL in der Datenbank auszuführen. Geben Sie die Datenquelle an und definieren Sie sie, die die SQL-Komponente verwendet, wie in (1) unten gezeigt.
<bean id="sqlComponent"
class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
Sie können auch die Datenquelle angeben, die im URI des Endpunkts verwendet werden soll. Wenn Sie sie jedoch auf diese Weise vorab angeben, wird der URI vereinfacht.
Geben Sie beim Ausführen von SQL die ID der zuvor angegebenen SQL-Komponente und die auszuführende DML im URI des Endpunkts an.
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
In SQL sind: #id,: #name,: #count Parameter und werden automatisch gebunden, wenn Sie im Nachrichtenkopf eine Eigenschaft mit demselben Namen festlegen. Obwohl SQL direkt auf den Endpunkt geschrieben wird, ist es besser, es aus der Eigenschaftendatei zu lesen.
Nachdem die SQL-Komponenten fertig sind, erstellen wir ein einfaches Beispielprogramm.
Im Beispielprogramm wird ein Datensatz in die Tabelle sample_test eingefügt und dieser Datensatz aktualisiert.
Der Stamm wird vom Transaktionsmanager gesteuert, indem "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
Im vorherigen Beispiel ist das Ergebnis mit oder ohne Transaktionsmanager dasselbe. Versuchen wir also ein Beispiel, bei dem nach dem Einfügen eines Datensatzes eine Ausnahme ausgelöst und ein Rollback durchgeführt wird.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
ThrowException (1) löst zwangsweise eine Ausnahme aus. Dadurch wird die zuvor ausgeführte INSERT-Anweisung zurückgesetzt. Da eine Ausnahme aufgetreten ist, endet die Route außerdem, ohne nachfolgende UPDATE-Anweisungen auszuführen. Das Laufzeitprotokoll wird wie folgt ausgegeben.
[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[main_route ] [main_route ] [timer://trigger?repeatCount=1 ] [ 80]
[main_route ] [transacted1 ] [transacted ] [ 0]
[main_route ] [setHeader1 ] [setHeader[id] ] [ 0]
[main_route ] [setHeader2 ] [setHeader[name] ] [ 0]
[main_route ] [setHeader3 ] [setHeader[count] ] [ 1]
[main_route ] [to1 ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [ 34]
[main_route ] [to2 ] [log:insertLog?showHeaders=true ] [ 6]
[main_route ] [throwException1 ] [throwException[ref:null] ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~ Ausgelassen ~
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception
In der letzten Zeile sehen Sie, dass der Thread von "TransactionErrorHandler" das Protokoll "Transaction Rollback" ausgibt und es zurückgesetzt wurde. Die Daten in der tatsächlichen Tabelle wurden aufgrund eines Rollbacks nicht eingefügt.
Im vorherigen Beispiel wurden mehrere Prozesse (DML) für eine Route definiert, die Transaktion wurde gestartet, als die Route gestartet wurde, und die Transaktion wurde beendet, als die Route endete.
Wenn Sie mehrere Routen haben, können Sie angeben, wie dieselbe Transaktion zwischen den Routen verwendet werden soll, eine andere Transaktion erstellen usw. Dies kann mit der Option PROPAGATION eingestellt werden. Die Transaktionsweitergabe (PROPAGATION) verwendet Spring-Transaktionen anstelle der eigenen Spezifikationen von Apache Camel.
Die Option PROPAGATION nimmt die folgenden Einstellungen an und verhält sich mit und ohne Transaktionen unterschiedlich.
Transaktionsausbreitungsattribut | Wenn es eine Transaktion gibt | Wenn es keine Transaktion gibt |
---|---|---|
PROPAGATION_REQUIRED | Innerhalb einer vorhandenen Transaktion ausführen. | Starten Sie eine neue Transaktion. |
PROPAGATION_REQUIRES_NEW | Starten Sie eine neue Transaktion getrennt von der vorhandenen. | Starten Sie eine neue Transaktion. |
PROPAGATION_MANDATORY | Innerhalb einer vorhandenen Transaktion ausführen. | Eine Ausnahme auslösen. |
PROPAGATION_SUPPORTS | Innerhalb einer vorhandenen Transaktion ausführen. | Ohne Transaktionen ausführen. |
PROPAGATION_NOT_SUPPORTED | Stoppen Sie eine vorhandene Transaktion und führen Sie sie ohne Transaktion aus. | Ohne Transaktionen ausführen. |
PROPAGATION_NEVER | Eine Ausnahme auslösen. | Ohne Transaktionen ausführen. |
PROPAGATION_NESTED | Es verwendet eine vorhandene Transaktion und wird nur in diesem Teil wie eine verschachtelte Transaktion verarbeitet. | Starten Sie eine neue Transaktion. |
Wenn Sie PROPAGATION mit Camel verwenden möchten, definieren Sie die zu verwendende Transaktionsweitergabe wie folgt: Es wurde im vorherigen Beispiel nicht verwendet, aber wenn es nicht angegeben ist, wird standardmäßig PROPAGATION_REQUIRED angegeben.
Im Folgenden wird die Weitergabe von drei Transaktionen definiert: PROPAGATION_REQUIRED (1), PROPAGATION_REQUIRES_NEW (2) und PROPAGATION_MANDATORY (3).
<bean id="PROPAGATION_REQUIRED"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRED" /><!-- (1) -->
</bean>
<bean id="PROPAGATION_REQUIRES_NEW"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
</bean>
<bean id="PROPAGATION_MANDATORY"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_MANDATORY" /><!-- (3) -->
</bean>
Um die definierte Transaktionsweitergabe zu verwenden, beschreiben Sie als "ref =" PROPAGATION_REQUIRED "" (1) in übersetzt. Das folgende Beispiel definiert drei Routen, main_route, tran1_route und tran2_route, von denen jede die Weitergabe der für die Übersetzung verwendeten Transaktion angibt.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
<to uri="direct:tran1" />
<to uri="direct:tran2" />
</route>
<route id="tran1_route">
<from uri="direct:tran1" />
<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
<route id="tran2_route">
<from uri="direct:tran2" />
<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
<setHeader headerName="id"><constant>id002</constant></setHeader>
<setHeader headerName="name"><constant>testuser2</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
Außerdem wird Spring standardmäßig zurückgesetzt, wenn eine nicht aktivierte Ausnahme (RuntimeException und ihre Unterklassen) auftritt, wird jedoch ohne Zurücksetzen festgeschrieben, wenn eine aktivierte Ausnahme auftritt. Andererseits rollt Camel auch mit ungeprüften Ausnahmen (und Ausnahmen) zurück.
Camel wird automatisch zurückgesetzt, wenn eine Ausnahme (nicht aktivierte Ausnahme) auftritt. Sie können sie jedoch auch explizit zurücksetzen.
Geben Sie zum expliziten Zurücksetzen "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<rollback markRollbackOnlyLast="true" />
</route>
Obwohl diesmal nicht erläutert, ist es möglich, Transaktionen zu verarbeiten, die auf mehrere Ressourcen wie mehrere JMS und JDBC abzielen. Dies wird als globale Transaktion bezeichnet. Dieses Mal haben wir es nur mit JDBC zu tun, das als lokale Transaktion im Gegensatz zu einer globalen Transaktion bezeichnet wird.
Recommended Posts