[JAVA] Transaktionsmanagement des integrierten Frameworks "Apache Camel"

Transaktion

In diesem Eintrag erklären wir das Transaktionsmanagement im Apache Camel-Framework.

Um ein wenig darüber zu erklären, was eine Transaktion überhaupt ist, kann gesagt werden, dass mehrere Prozesse zu einem zusammengefasst werden (ich denke, es ist unnötig zu sagen). Dieser Mehrfachprozess ist untrennbar miteinander verbunden und führt entweder zu einer Verarbeitung oder zu einer Nichtverarbeitung. Dies ist die Atomizität der ACID-Eigenschaft der bekannten Transaktion.

--Atomizität: Die gesamte Verarbeitung wird vollständig oder gar nicht ausgeführt. --Konsistenz: Es gibt keine Inkonsistenz in den Daten, unabhängig vom Endzustand der Transaktion (Konsistenz). --Isolation: Wenn mehrere Transaktionen ausgeführt werden, stören Sie andere Transaktionen nicht.

Wenn beim Ausführen mehrerer Prozesse ein Fehler auftritt, wird der Status vor der Ausführung der Transaktion wiederhergestellt. Dies wird als Rollback bezeichnet. Wenn mehrere Prozesse fehlerfrei erfolgreich sind, wird das Verarbeitungsergebnis durch Festschreiben wiedergegeben.

Transaktionen mit Apache Camel

Von Apache Camel abgewickelte Transaktionen sind JDBC oder JMS, aber hier werden wir für JDBC erklären. Camel basiert auf dem Transaktionsmanagement von Spring. Wenn Sie also mit Spring gearbeitet haben, sind die Transaktionen von Camel leicht zu verstehen.

Spring-Transaktionen werden von einem sogenannten Transaction Manager verwaltet. Es stehen mehrere Transaktionsmanager zur Verfügung. Verwenden Sie im Fall von JDBC "org.springframework.jdbc.datasource.DataSourceTransactionManager". Es gibt andere Transaktionsmanager für JTA, JMS und Hibername (obwohl ich sie nicht verwendet habe).

Ebenso verwaltet Camel Transaktionen mit dem Transaktionsmanager von Spring. Lassen Sie uns nun das Transaktionsmanagement von Camel für JDBC erläutern.

Beispieltransaktion in Camel

Ich werde anhand eines einfachen Beispielprogramms erklären, das Transaktionen in Camel abwickelt. Das Beispielprogramm verwendet die SQL-Komponente von Camel, um auf die Datenbank zuzugreifen.

Definieren Sie zunächst die vom Transaktionsmanager verwaltete Datenquelle. Die Ziel-DB ist PostgreSQL, eine Datenquelle, die HikariCP für das Verbindungspooling verwendet.

	<bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
		<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
		<property name="driverClassName" value="org.postgresql.Driver" />
		<property name="username" value="postgres" />
		<property name="password" value="postgres" />
		<property name="autoCommit" value="false" />
	</bean>

	<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
		destroy-method="close">
		<constructor-arg ref="hikariConfig" />
	</bean>

Definiert einen Transaktionsmanager, der die definierte Datenquelle verwaltet. Die zuvor in (1) erstellte Datenquelle wird in der Eigenschaft angegeben.

	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" /><!-- (1) -->
	</bean>

Erstellen Sie eine Testtabelle

Erstens können Sie es nicht ohne Tabelle versuchen. Erstellen Sie daher eine einfache Tabelle (sample_test).

create table sample_test (
    id varchar(10) not null primary key,
    name varchar(255) not null,
    count integer not null, 
    insert_date timestamp not null);

SQL-Komponentendefinition

Verwenden Sie SQL-Komponenten, um SQL in der Datenbank auszuführen. Geben Sie die Datenquelle an und definieren Sie sie, die die SQL-Komponente verwendet, wie in (1) unten gezeigt.

	<bean id="sqlComponent"
		class="org.apache.camel.component.sql.SqlComponent">
		<property name="dataSource" ref="dataSource" /><!-- (1) -->
	</bean>

Sie können auch die Datenquelle angeben, die im URI des Endpunkts verwendet werden soll. Wenn Sie sie jedoch auf diese Weise vorab angeben, wird der URI vereinfacht.

Geben Sie beim Ausführen von SQL die ID der zuvor angegebenen SQL-Komponente und die auszuführende DML im URI des Endpunkts an.

			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />

In SQL sind: #id,: #name,: #count Parameter und werden automatisch gebunden, wenn Sie im Nachrichtenkopf eine Eigenschaft mit demselben Namen festlegen. Obwohl SQL direkt auf den Endpunkt geschrieben wird, ist es besser, es aus der Eigenschaftendatei zu lesen.

Einfaches Beispielprogramm

Nachdem die SQL-Komponenten fertig sind, erstellen wir ein einfaches Beispielprogramm.

Im Beispielprogramm wird ein Datensatz in die Tabelle sample_test eingefügt und dieser Datensatz aktualisiert. Der Stamm wird vom Transaktionsmanager gesteuert, indem "\ " (1) für die Transaktion geschrieben wird. Wenn die Route startet, wird die Transaktion gestartet, und wenn die Route fehlerfrei abgeschlossen wird, schreibt der Transaktionsmanager automatisch fest. Außerdem können Sie sehen, dass die Werte von Parametern wie: #id,: #name,: #count in SQL zuvor von setHeader (2) angegeben wurden.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted /><!-- (1) -->
			<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
			<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
			<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

Im vorherigen Beispiel ist das Ergebnis mit oder ohne Transaktionsmanager dasselbe. Versuchen wir also ein Beispiel, bei dem nach dem Einfügen eines Datensatzes eine Ausnahme ausgelöst und ein Rollback durchgeführt wird.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted />
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
			<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
			<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

ThrowException (1) löst zwangsweise eine Ausnahme aus. Dadurch wird die zuvor ausgeführte INSERT-Anweisung zurückgesetzt. Da eine Ausnahme aufgetreten ist, endet die Route außerdem, ohne nachfolgende UPDATE-Anweisungen auszuführen. Das Laufzeitprotokoll wird wie folgt ausgegeben.

[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[main_route        ] [main_route        ] [timer://trigger?repeatCount=1                                                 ] [        80]
[main_route        ] [transacted1       ] [transacted                                                                    ] [         0]
[main_route        ] [setHeader1        ] [setHeader[id]                                                                 ] [         0]
[main_route        ] [setHeader2        ] [setHeader[name]                                                               ] [         0]
[main_route        ] [setHeader3        ] [setHeader[count]                                                              ] [         1]
[main_route        ] [to1               ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [        34]
[main_route        ] [to2               ] [log:insertLog?showHeaders=true                                                ] [         6]
[main_route        ] [throwException1   ] [throwException[ref:null]                                                      ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~ Ausgelassen ~
	at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception

In der letzten Zeile sehen Sie, dass der Thread von "TransactionErrorHandler" das Protokoll "Transaction Rollback" ausgibt und es zurückgesetzt wurde. Die Daten in der tatsächlichen Tabelle wurden aufgrund eines Rollbacks nicht eingefügt.

Transaktionsausbreitung (PROPAGATION)

Im vorherigen Beispiel wurden mehrere Prozesse (DML) für eine Route definiert, die Transaktion wurde gestartet, als die Route gestartet wurde, und die Transaktion wurde beendet, als die Route endete.

Wenn Sie mehrere Routen haben, können Sie angeben, wie dieselbe Transaktion zwischen den Routen verwendet werden soll, eine andere Transaktion erstellen usw. Dies kann mit der Option PROPAGATION eingestellt werden. Die Transaktionsweitergabe (PROPAGATION) verwendet Spring-Transaktionen anstelle der eigenen Spezifikationen von Apache Camel.

Die Option PROPAGATION nimmt die folgenden Einstellungen an und verhält sich mit und ohne Transaktionen unterschiedlich.

Transaktionsausbreitungsattribut Wenn es eine Transaktion gibt Wenn es keine Transaktion gibt
PROPAGATION_REQUIRED Innerhalb einer vorhandenen Transaktion ausführen. Starten Sie eine neue Transaktion.
PROPAGATION_REQUIRES_NEW Starten Sie eine neue Transaktion getrennt von der vorhandenen. Starten Sie eine neue Transaktion.
PROPAGATION_MANDATORY Innerhalb einer vorhandenen Transaktion ausführen. Eine Ausnahme auslösen.
PROPAGATION_SUPPORTS Innerhalb einer vorhandenen Transaktion ausführen. Ohne Transaktionen ausführen.
PROPAGATION_NOT_SUPPORTED Stoppen Sie eine vorhandene Transaktion und führen Sie sie ohne Transaktion aus. Ohne Transaktionen ausführen.
PROPAGATION_NEVER Eine Ausnahme auslösen. Ohne Transaktionen ausführen.
PROPAGATION_NESTED Es verwendet eine vorhandene Transaktion und wird nur in diesem Teil wie eine verschachtelte Transaktion verarbeitet. Starten Sie eine neue Transaktion.

Wenn Sie PROPAGATION mit Camel verwenden möchten, definieren Sie die zu verwendende Transaktionsweitergabe wie folgt: Es wurde im vorherigen Beispiel nicht verwendet, aber wenn es nicht angegeben ist, wird standardmäßig PROPAGATION_REQUIRED angegeben.

Im Folgenden wird die Weitergabe von drei Transaktionen definiert: PROPAGATION_REQUIRED (1), PROPAGATION_REQUIRES_NEW (2) und PROPAGATION_MANDATORY (3).

	<bean id="PROPAGATION_REQUIRED"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_REQUIRED" /><!-- (1) -->
	</bean>

	<bean id="PROPAGATION_REQUIRES_NEW"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
	</bean>

	<bean id="PROPAGATION_MANDATORY"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_MANDATORY" /><!-- (3) -->
	</bean>

Um die definierte Transaktionsweitergabe zu verwenden, beschreiben Sie als "ref =" PROPAGATION_REQUIRED "" (1) in übersetzt. Das folgende Beispiel definiert drei Routen, main_route, tran1_route und tran2_route, von denen jede die Weitergabe der für die Übersetzung verwendeten Transaktion angibt.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
			<to uri="direct:tran1" />
			<to uri="direct:tran2" />
		</route>

		<route id="tran1_route">
			<from uri="direct:tran1" />
			<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>testuser</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

		<route id="tran2_route">
			<from uri="direct:tran2" />
			<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
			<setHeader headerName="id"><constant>id002</constant></setHeader>
			<setHeader headerName="name"><constant>testuser2</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

Außerdem wird Spring standardmäßig zurückgesetzt, wenn eine nicht aktivierte Ausnahme (RuntimeException und ihre Unterklassen) auftritt, wird jedoch ohne Zurücksetzen festgeschrieben, wenn eine aktivierte Ausnahme auftritt. Andererseits rollt Camel auch mit ungeprüften Ausnahmen (und Ausnahmen) zurück.

Führen Sie explizit einen Rollback durch

Camel wird automatisch zurückgesetzt, wenn eine Ausnahme (nicht aktivierte Ausnahme) auftritt. Sie können sie jedoch auch explizit zurücksetzen.

Geben Sie zum expliziten Zurücksetzen "\ " an. Dies markiert die aktuelle Transaktion für das Rollback, ohne eine Ausnahme auszulösen. Die nachfolgende Verarbeitung wird nicht ausgeführt. Wenn also eine Verarbeitung wie die Ausgabe eines Protokolls vor dem Rollback erforderlich ist, führen Sie es vor dem Rollback-Element aus.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted />
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>testuser</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<rollback markRollbackOnlyLast="true" />
		</route>

Schließlich

Obwohl diesmal nicht erläutert, ist es möglich, Transaktionen zu verarbeiten, die auf mehrere Ressourcen wie mehrere JMS und JDBC abzielen. Dies wird als globale Transaktion bezeichnet. Dieses Mal haben wir es nur mit JDBC zu tun, das als lokale Transaktion im Gegensatz zu einer globalen Transaktion bezeichnet wird.

Referenz

TODO für mich

Recommended Posts

Transaktionsmanagement des integrierten Frameworks "Apache Camel"
Integriertes Framework Grundlagen zur Weiterleitung von Apache Camel-Nachrichten
Senden und Empfangen einfacher Nachrichten mit der Kafka-Komponente des Apache Camel-Frameworks (Java DSL Edition)
Apache Camel in der Cloud-Ära
Informationen zur ersten Anzeige von Spring Framework
Untersuchen Sie das Verhalten des JPA-Transaktionszeitlimits
[Apache Camel] Einfacher Durchsatz in einem Protokoll ausgeben
Über den offiziellen Startleitfaden für Spring Framework
Eine Übersicht über die Spring Framework Resource-Oberfläche
So laden Sie eine ältere Version von Apache Tomcat herunter
Eine Aufzeichnung über das Studium des Spring Framework von Grund auf neu
Eine Übersicht über das native Java-Framework Quarkus von Kubernetes