Dans cette entrée, nous expliquerons la gestion des transactions dans le framework Apache Camel.
Pour expliquer un peu ce qu'est une transaction en premier lieu, on peut dire que plusieurs processus sont combinés en un seul (je pense qu'il est inutile de le dire). Ce processus multiple est inséparable et se traduit par un traitement ou un non-traitement. Il s'agit de l'atomicité de la propriété ACID de la transaction bien connue.
--Atomicité: tout le traitement est complètement effectué ou aucun n'est effectué. --Cohérence: il n'y a aucune incohérence dans les données quel que soit l'état final de la transaction (cohérence). --Isolation: si plusieurs transactions sont en cours d'exécution, n'interférez pas avec d'autres transactions.
S'il y a une erreur lors de l'exécution de plusieurs processus, il revient à l'état avant l'exécution de la transaction. C'est ce qu'on appelle la restauration. Si plusieurs processus réussissent sans aucune erreur, le résultat du traitement sera reflété par la validation.
Les transactions gérées par Apache Camel sont JDBC ou JMS, mais nous expliquerons ici pour JDBC. Camel est basé sur la gestion des transactions de Spring, donc si vous avez travaillé avec Spring, vous trouverez les transactions de Camel faciles à comprendre.
Les transactions Spring sont gérées par ce qu'on appelle un gestionnaire de transactions. Il existe plusieurs gestionnaires de transactions disponibles, et dans le cas de JDBC, utilisez "org.springframework.jdbc.datasource.DataSourceTransactionManager". Il existe d'autres gestionnaires de transactions pour JTA, JMS et Hibername (bien que je ne les ai pas utilisés)
De même, Camel gère les transactions à l'aide du gestionnaire de transactions de Spring. Maintenant, expliquons la gestion des transactions de Camel pour JDBC.
Je vais vous expliquer en utilisant un exemple de programme simple qui gère les transactions dans Camel. L'exemple de programme utilise le composant SQL de Camel pour accéder à la base de données.
Tout d'abord, définissez la source de données gérée par le gestionnaire de transactions. La base de données cible est PostgreSQL, qui est une source de données qui utilise HikariCP pour le regroupement de connexions.
<bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
<property name="driverClassName" value="org.postgresql.Driver" />
<property name="username" value="postgres" />
<property name="password" value="postgres" />
<property name="autoCommit" value="false" />
</bean>
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="close">
<constructor-arg ref="hikariConfig" />
</bean>
Définit un gestionnaire de transactions qui gère la source de données définie. La source de données créée précédemment dans (1) est spécifiée dans la propriété.
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
Tout d'abord, vous ne pouvez pas essayer sans table, alors créez une table simple (sample_test).
create table sample_test (
id varchar(10) not null primary key,
name varchar(255) not null,
count integer not null,
insert_date timestamp not null);
Utilisez des composants SQL pour exécuter SQL sur la base de données. Spécifiez et définissez la source de données qui utilise le composant SQL comme indiqué dans (1) ci-dessous.
<bean id="sqlComponent"
class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>
Vous pouvez également spécifier la source de données à utiliser dans l'URI du point de terminaison, mais la pré-spécification de cette manière simplifie l'URI.
Lors de l'exécution de SQL, spécifiez l'ID du composant SQL spécifié précédemment et le DML à exécuter dans l'URI du noeud final.
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
En SQL ,: #id ,: #name ,: #count sont des paramètres et seront automatiquement liés si vous définissez une propriété avec le même nom dans l'en-tête du message. Bien que SQL soit écrit directement sur le noeud final, il est préférable de le lire à partir du fichier de propriétés.
Maintenant que les composants SQL sont prêts, créons un exemple de programme simple.
Dans l'exemple de programme, un enregistrement est inséré dans la table sample_test et cet enregistrement est mis à jour.
L'itinéraire est sous le contrôle du gestionnaire de transactions en écrivant "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
Dans l'exemple précédent, le résultat est le même avec ou sans le gestionnaire de transactions, essayons donc un exemple où une exception est déclenchée après l'insertion d'un enregistrement et elle est annulée.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>
ThrowException (1) lève de force une exception. Cela annule l'instruction INSERT qui a été exécutée avant elle. De plus, puisqu'une exception s'est produite, l'itinéraire se termine sans exécuter les instructions UPDATE suivantes. Le journal d'exécution est généré comme suit.
[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[main_route ] [main_route ] [timer://trigger?repeatCount=1 ] [ 80]
[main_route ] [transacted1 ] [transacted ] [ 0]
[main_route ] [setHeader1 ] [setHeader[id] ] [ 0]
[main_route ] [setHeader2 ] [setHeader[name] ] [ 0]
[main_route ] [setHeader3 ] [setHeader[count] ] [ 1]
[main_route ] [to1 ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [ 34]
[main_route ] [to2 ] [log:insertLog?showHeaders=true ] [ 6]
[main_route ] [throwException1 ] [throwException[ref:null] ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~ Omis ~
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception
Dans la dernière ligne, vous pouvez voir que le thread de "TransactionErrorHandler" sort le journal "Transaction rollback" et qu'il a été annulé. Les données de la table réelle n'ont pas été insérées en raison de la restauration.
Dans l'exemple précédent, plusieurs processus (DML) ont été définis pour une route, la transaction a démarré lorsque la route a commencé et la transaction s'est terminée lorsque la route s'est terminée.
Si vous avez plusieurs itinéraires, vous pouvez spécifier comment utiliser la même transaction entre les itinéraires, créer une autre transaction, etc. Cela peut être défini avec l'option PROPAGATION. La propagation des transactions (PROPAGATION) utilise les transactions Spring plutôt que les propres spécifications d'Apache Camel.
L'option PROPAGATION prend les paramètres suivants et se comporte différemment avec et sans transactions.
Attribut de propagation de transaction | S'il y a une transaction | S'il n'y a pas de transaction |
---|---|---|
PROPAGATION_REQUIRED | Exécuter dans une transaction existante. | Démarrez une nouvelle transaction. |
PROPAGATION_REQUIRES_NEW | Démarrez une nouvelle transaction séparément de celle existante. | Démarrez une nouvelle transaction. |
PROPAGATION_MANDATORY | Exécuter dans une transaction existante. | Lancez une exception. |
PROPAGATION_SUPPORTS | Exécuter dans une transaction existante. | Exécutez sans transactions. |
PROPAGATION_NOT_SUPPORTED | Arrêtez une transaction existante et exécutez-la sans transaction. | Exécutez sans transactions. |
PROPAGATION_NEVER | Lancez une exception. | Exécutez sans transactions. |
PROPAGATION_NESTED | Il utilise une transaction existante et est traité comme une transaction imbriquée uniquement dans cette partie. | Démarrez une nouvelle transaction. |
Si vous souhaitez utiliser PROPAGATION avec Camel, définissez la propagation de transaction à utiliser comme suit: Il n'a pas été utilisé dans l'exemple précédent, mais s'il n'est pas spécifié, PROPAGATION_REQUIRED est spécifié par défaut.
Ce qui suit définit la propagation de trois transactions: PROPAGATION_REQUIRED (1), PROPAGATION_REQUIRES_NEW (2) et PROPAGATION_MANDATORY (3).
<bean id="PROPAGATION_REQUIRED"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRED" /><!-- (1) -->
</bean>
<bean id="PROPAGATION_REQUIRES_NEW"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
</bean>
<bean id="PROPAGATION_MANDATORY"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_MANDATORY" /><!-- (3) -->
</bean>
Pour utiliser la propagation de transaction définie, écrivez "ref =" PROPAGATION_REQUIRED "" (1) en traduction. L'exemple suivant définit trois routes, main_route, tran1_route et tran2_route, chacune spécifiant la propagation de la transaction utilisée pour la traduction.
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
<to uri="direct:tran1" />
<to uri="direct:tran2" />
</route>
<route id="tran1_route">
<from uri="direct:tran1" />
<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
<route id="tran2_route">
<from uri="direct:tran2" />
<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
<setHeader headerName="id"><constant>id002</constant></setHeader>
<setHeader headerName="name"><constant>testuser2</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>
De plus, par défaut, Spring annule si une exception non vérifiée (RuntimeException et ses sous-classes) se produit, mais se valide sans revenir en arrière si une exception cochée se produit. D'autre part, Camel revient également avec des exceptions non vérifiées (et des exceptions).
Camel annule automatiquement lorsqu'une exception (exception non cochée) se produit, mais vous pouvez également l'annuler explicitement.
Pour annuler explicitement, spécifiez "\
<route id="main_route">
<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<rollback markRollbackOnlyLast="true" />
</route>
Bien que cela ne soit pas expliqué cette fois, il est possible de gérer des transactions ciblant plusieurs ressources telles que plusieurs JMS et JDBC. C'est ce qu'on appelle une transaction globale. Cette fois, nous n'avons affaire qu'à JDBC, qui est appelée une transaction locale par opposition à une transaction globale.
Recommended Posts