[JAVA] Gestion des transactions du framework intégré "Apache Camel"

transaction

Dans cette entrée, nous expliquerons la gestion des transactions dans le framework Apache Camel.

Pour expliquer un peu ce qu'est une transaction en premier lieu, on peut dire que plusieurs processus sont combinés en un seul (je pense qu'il est inutile de le dire). Ce processus multiple est inséparable et se traduit par un traitement ou un non-traitement. Il s'agit de l'atomicité de la propriété ACID de la transaction bien connue.

--Atomicité: tout le traitement est complètement effectué ou aucun n'est effectué. --Cohérence: il n'y a aucune incohérence dans les données quel que soit l'état final de la transaction (cohérence). --Isolation: si plusieurs transactions sont en cours d'exécution, n'interférez pas avec d'autres transactions.

S'il y a une erreur lors de l'exécution de plusieurs processus, il revient à l'état avant l'exécution de la transaction. C'est ce qu'on appelle la restauration. Si plusieurs processus réussissent sans aucune erreur, le résultat du traitement sera reflété par la validation.

Transactions avec Apache Camel

Les transactions gérées par Apache Camel sont JDBC ou JMS, mais nous expliquerons ici pour JDBC. Camel est basé sur la gestion des transactions de Spring, donc si vous avez travaillé avec Spring, vous trouverez les transactions de Camel faciles à comprendre.

Les transactions Spring sont gérées par ce qu'on appelle un gestionnaire de transactions. Il existe plusieurs gestionnaires de transactions disponibles, et dans le cas de JDBC, utilisez "org.springframework.jdbc.datasource.DataSourceTransactionManager". Il existe d'autres gestionnaires de transactions pour JTA, JMS et Hibername (bien que je ne les ai pas utilisés)

De même, Camel gère les transactions à l'aide du gestionnaire de transactions de Spring. Maintenant, expliquons la gestion des transactions de Camel pour JDBC.

Exemple de transaction à Camel

Je vais vous expliquer en utilisant un exemple de programme simple qui gère les transactions dans Camel. L'exemple de programme utilise le composant SQL de Camel pour accéder à la base de données.

Tout d'abord, définissez la source de données gérée par le gestionnaire de transactions. La base de données cible est PostgreSQL, qui est une source de données qui utilise HikariCP pour le regroupement de connexions.

	<bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
		<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
		<property name="driverClassName" value="org.postgresql.Driver" />
		<property name="username" value="postgres" />
		<property name="password" value="postgres" />
		<property name="autoCommit" value="false" />
	</bean>

	<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
		destroy-method="close">
		<constructor-arg ref="hikariConfig" />
	</bean>

Définit un gestionnaire de transactions qui gère la source de données définie. La source de données créée précédemment dans (1) est spécifiée dans la propriété.

	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" /><!-- (1) -->
	</bean>

Créer une table de test

Tout d'abord, vous ne pouvez pas essayer sans table, alors créez une table simple (sample_test).

create table sample_test (
    id varchar(10) not null primary key,
    name varchar(255) not null,
    count integer not null, 
    insert_date timestamp not null);

Définition du composant SQL

Utilisez des composants SQL pour exécuter SQL sur la base de données. Spécifiez et définissez la source de données qui utilise le composant SQL comme indiqué dans (1) ci-dessous.

	<bean id="sqlComponent"
		class="org.apache.camel.component.sql.SqlComponent">
		<property name="dataSource" ref="dataSource" /><!-- (1) -->
	</bean>

Vous pouvez également spécifier la source de données à utiliser dans l'URI du point de terminaison, mais la pré-spécification de cette manière simplifie l'URI.

Lors de l'exécution de SQL, spécifiez l'ID du composant SQL spécifié précédemment et le DML à exécuter dans l'URI du noeud final.

			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />

En SQL ,: #id ,: #name ,: #count sont des paramètres et seront automatiquement liés si vous définissez une propriété avec le même nom dans l'en-tête du message. Bien que SQL soit écrit directement sur le noeud final, il est préférable de le lire à partir du fichier de propriétés.

Exemple de programme simple

Maintenant que les composants SQL sont prêts, créons un exemple de programme simple.

Dans l'exemple de programme, un enregistrement est inséré dans la table sample_test et cet enregistrement est mis à jour. L'itinéraire est sous le contrôle du gestionnaire de transactions en écrivant "\ " (1) pour la transaction. Lorsque la route démarre, la transaction démarre, et si la route se termine sans erreur, le gestionnaire de transactions valide automatiquement. En outre, vous pouvez voir que les valeurs de paramètres tels que: #id ,: #name ,: #count en SQL sont spécifiées par setHeader (2) avant cela.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted /><!-- (1) -->
			<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
			<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
			<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

Dans l'exemple précédent, le résultat est le même avec ou sans le gestionnaire de transactions, essayons donc un exemple où une exception est déclenchée après l'insertion d'un enregistrement et elle est annulée.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted />
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
			<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
			<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

ThrowException (1) lève de force une exception. Cela annule l'instruction INSERT qui a été exécutée avant elle. De plus, puisqu'une exception s'est produite, l'itinéraire se termine sans exécuter les instructions UPDATE suivantes. Le journal d'exécution est généré comme suit.

[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[main_route        ] [main_route        ] [timer://trigger?repeatCount=1                                                 ] [        80]
[main_route        ] [transacted1       ] [transacted                                                                    ] [         0]
[main_route        ] [setHeader1        ] [setHeader[id]                                                                 ] [         0]
[main_route        ] [setHeader2        ] [setHeader[name]                                                               ] [         0]
[main_route        ] [setHeader3        ] [setHeader[count]                                                              ] [         1]
[main_route        ] [to1               ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [        34]
[main_route        ] [to2               ] [log:insertLog?showHeaders=true                                                ] [         6]
[main_route        ] [throwException1   ] [throwException[ref:null]                                                      ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~ Omis ~
	at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception

Dans la dernière ligne, vous pouvez voir que le thread de "TransactionErrorHandler" sort le journal "Transaction rollback" et qu'il a été annulé. Les données de la table réelle n'ont pas été insérées en raison de la restauration.

Propagation de transaction (PROPAGATION)

Dans l'exemple précédent, plusieurs processus (DML) ont été définis pour une route, la transaction a démarré lorsque la route a commencé et la transaction s'est terminée lorsque la route s'est terminée.

Si vous avez plusieurs itinéraires, vous pouvez spécifier comment utiliser la même transaction entre les itinéraires, créer une autre transaction, etc. Cela peut être défini avec l'option PROPAGATION. La propagation des transactions (PROPAGATION) utilise les transactions Spring plutôt que les propres spécifications d'Apache Camel.

L'option PROPAGATION prend les paramètres suivants et se comporte différemment avec et sans transactions.

Attribut de propagation de transaction S'il y a une transaction S'il n'y a pas de transaction
PROPAGATION_REQUIRED Exécuter dans une transaction existante. Démarrez une nouvelle transaction.
PROPAGATION_REQUIRES_NEW Démarrez une nouvelle transaction séparément de celle existante. Démarrez une nouvelle transaction.
PROPAGATION_MANDATORY Exécuter dans une transaction existante. Lancez une exception.
PROPAGATION_SUPPORTS Exécuter dans une transaction existante. Exécutez sans transactions.
PROPAGATION_NOT_SUPPORTED Arrêtez une transaction existante et exécutez-la sans transaction. Exécutez sans transactions.
PROPAGATION_NEVER Lancez une exception. Exécutez sans transactions.
PROPAGATION_NESTED Il utilise une transaction existante et est traité comme une transaction imbriquée uniquement dans cette partie. Démarrez une nouvelle transaction.

Si vous souhaitez utiliser PROPAGATION avec Camel, définissez la propagation de transaction à utiliser comme suit: Il n'a pas été utilisé dans l'exemple précédent, mais s'il n'est pas spécifié, PROPAGATION_REQUIRED est spécifié par défaut.

Ce qui suit définit la propagation de trois transactions: PROPAGATION_REQUIRED (1), PROPAGATION_REQUIRES_NEW (2) et PROPAGATION_MANDATORY (3).

	<bean id="PROPAGATION_REQUIRED"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_REQUIRED" /><!-- (1) -->
	</bean>

	<bean id="PROPAGATION_REQUIRES_NEW"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
	</bean>

	<bean id="PROPAGATION_MANDATORY"
		class="org.apache.camel.spring.spi.SpringTransactionPolicy">
		<property name="transactionManager" ref="transactionManager" />
		<property name="propagationBehaviorName"
			value="PROPAGATION_MANDATORY" /><!-- (3) -->
	</bean>

Pour utiliser la propagation de transaction définie, écrivez "ref =" PROPAGATION_REQUIRED "" (1) en traduction. L'exemple suivant définit trois routes, main_route, tran1_route et tran2_route, chacune spécifiant la propagation de la transaction utilisée pour la traduction.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
			<to uri="direct:tran1" />
			<to uri="direct:tran2" />
		</route>

		<route id="tran1_route">
			<from uri="direct:tran1" />
			<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>testuser</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

		<route id="tran2_route">
			<from uri="direct:tran2" />
			<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
			<setHeader headerName="id"><constant>id002</constant></setHeader>
			<setHeader headerName="name"><constant>testuser2</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
		</route>

De plus, par défaut, Spring annule si une exception non vérifiée (RuntimeException et ses sous-classes) se produit, mais se valide sans revenir en arrière si une exception cochée se produit. D'autre part, Camel revient également avec des exceptions non vérifiées (et des exceptions).

Effectuer explicitement une restauration

Camel annule automatiquement lorsqu'une exception (exception non cochée) se produit, mais vous pouvez également l'annuler explicitement.

Pour annuler explicitement, spécifiez "\ ". Cela marque la transaction en cours pour la restauration sans lever d'exception. Le traitement suivant n'est pas exécuté, donc si un traitement tel que la sortie d'un journal avant la restauration est requis, exécutez-le avant l'élément de restauration.

		<route id="main_route">
			<from uri="timer:trigger?repeatCount=1" />
			<transacted />
			<setHeader headerName="id"><constant>id001</constant></setHeader>
			<setHeader headerName="name"><constant>testuser</constant></setHeader>
			<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
			<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
			<to uri="log:insertLog?showHeaders=true" />
			<rollback markRollbackOnlyLast="true" />
		</route>

finalement

Bien que cela ne soit pas expliqué cette fois, il est possible de gérer des transactions ciblant plusieurs ressources telles que plusieurs JMS et JDBC. C'est ce qu'on appelle une transaction globale. Cette fois, nous n'avons affaire qu'à JDBC, qui est appelée une transaction locale par opposition à une transaction globale.

référence

TODO pour moi

Recommended Posts

Gestion des transactions du framework intégré "Apache Camel"
Framework intégré Principes de base du routage des messages Apache Camel
Envoi et réception de messages simples à l'aide du composant Kafka du framework Apache Camel (édition Java DSL)
Apache Camel à l'ère du cloud natif
À propos de l'affichage initial de Spring Framework
Examiner le comportement du délai d'expiration des transactions JPA
[Apache Camel] Sortie facile du débit dans un journal
À propos du guide de démarrage officiel de Spring Framework
Une note de révision de l'interface Spring Framework Resource
Comment télécharger une ancienne version d'Apache Tomcat
Un record d'étude du Spring Framework à partir de zéro
Une vue d'ensemble du framework Java natif de Kubernetes Quarkus