I will explain the basics of message routing in Apache Camel.
Before that, we need to know the basic concepts of Camel such as routes, so I will explain it.
The basic element of Camel is the root. At the beginning of the route, Camel receives a message from one endpoint and sends a message to another endpoint at the end of the route. Defining this route is the basis of development using Camel, creating multiple routes within one application and connecting to the endpoint.
Endpoints represent other external systems, etc. as seen by Camel. Endpoints are local file systems, web services, FTP servers, JMS queues, mail servers, etc., and are created by the components.
Components are used to connect to endpoints. With over 200 components, Camel allows you to send and receive data simply by finding and using the components you need, without having to write code to send and receive data to and from endpoints.
This component is divided into producer and consumer. The component for writing something is called a producer. For example, writing to a file, sending data to a web service, uploading to an FTP server, etc. are components of the producer. Second, the component for reading something is called the consumer. For example, reading a file, receiving a request from a web service, subscribing from a queue, etc. are consumer components.
Exchange
The data received by the consumer is called Exchange. In Camel, Exchange flows through the route, processed by the processing Processor, and passed to the producer.
Let's look at an example of a simple route definition. Java DSLs and Spring XML DSLs are the two most commonly used route definitions.
The following source is an example of a route defined in a Java DSL.
from("file:data/input?noop=true&fileName=file1.txt")
.log("contents: ${body}")
.marshal().zipFile()
.to("file:data/output");
from("file:data/input?noop=true&fileName=file1.txt")
The from method represents the producer. The component used by the producer is "file", which reads file1.txt in the "data / input" directory and puts it into Exchange.
.log("contents: ${body}")
Next, the log component is used, and the contents of the file read by the file component are output to the log.
.marshal().zipFile()
marshal is a data conversion that compresses the contents of the file into a zip file.
.to("file:data/output");
Finally, the to method represents the consumer. The file component used by the consumer outputs the previously compressed zip file to the "data / output" directory.
This process, which would take dozens of lines without Camel, could be achieved with a simple route defined in just four lines.
Next, in the case of Spring XML DSL, the previous root will be the following XML file.
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
</route>
Now that you understand the basic concept of Camel, I would like to finally move on to message routing ... But before that, I will explain the components that perform synchronous / asynchronous processing of routes. This is also the first thing to remember when defining a route, so let's talk about it before message routing.
Synchronous messaging components include Direct and Direct-VM, and asynchronous messaging components include SEDA and VM. We'll cover two commonly used components, Direct and SEDA (I've only used them on real systems). Direct-VM and VM, which are not explained this time, are components that perform messaging across multiple CamelContexts, and can be used when the Web application is divided into multiple CamelContexts.
The producer of a Direct component calls the consumer of the same Direct component directly. This allows you to split the route into multiple parts.
In addition to the advantage of increasing readability when dividing a route, there is an advantage that the route can be made into parts. For example, by extracting common processing for multiple routes and using them as different routes, it is possible to avoid duplicating routes with the same content.
The URI of the Direct component is:
direct:endpointName
Here's an example of splitting the root to zip the file with the Direct component:
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<to uri="direct:second" />
</route>
<route id="sub_route">
<from uri="direct:second" />
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
</route>
From the producer of the first route ("
The SEDA component allows asynchronous processing between routes with message queues. SEDA components provide a simple message queue for in-memory processing. It is also easy to replace this with ActiveMQ etc.
The URI of the SEDA component is:
seda:endpointName
Here's an example of splitting the root to zip the file with the SEDA component:
from("file:data/input?noop=true&fileName=file1.txt").id("main_route")
.log("contents: ${body}")
.to("seda:second")
.log("main_route end.");
from("seda:second").id("sub_route")
.delay(1000L)
.marshal().zipFile()
.to("file:data/output")
.log("sub_route end.");
For XML DSL, define as follows.
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<to uri="seda:second" />
<log message="main_route end." />
</route>
<route id="sub_route">
<from uri="seda:second" />
<delay asyncDelayed="false">
<constant>1000</constant>
</delay>
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
<log message="sub_route end." />
</route>
When you execute it, you can see that the thread name is divided into "thread # 1" and "thread # 2" by the root.
[2019-02-07 21:47:07.015], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, contents: file1 example
[2019-02-07 21:47:07.023], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-07 21:47:08.047], [INFO ], sub_route, Camel (camel-1) thread #2 - seda://second, sub_route, sub_route end.
See also the following articles about SEDA.
-Basic usage of Apache Camel standard queue (SEDA)
In Apache Camel, message routing supports Enterprise Integration Pattern, and various methods are implemented.
This section introduces commonly used message routing.
routing | Description |
---|---|
Content Based Router (conditional branching) | The Content Based Router routes messages based on the data content of the Message Exchange. |
Splitter | Splitter allows you to split message Exchange data and process it in parallel. For example, List type data is stored in Exchange and can be processed one by one in parallel. |
Aggregator | Aggregator is the opposite of Splitter, which allows you to aggregate and process Exchanges in the root. For example, it is possible to change one data flowing to the root to one CSV record, aggregate multiple records, and output to a CSV file. |
Throttler (throttle / flow control) | Throttler allows you to control the amount of data that flows to the next endpoint. For example, you can limit the number of processed data per unit time so that the destination does not become overloaded. |
Message Filter | Message Filter filters messages based on the contents of the Message Exchange data. |
Dynamic Router | Dynamic Router allows you to implement logic that dynamically routes messages, subject to the data content of the Message Exchange and the endpoint from which it is routed. |
Load Balancer | Load Balancer allows distributed routing when there are endpoints for multiple destinations. |
Multicast | Multicast allows you to route the same message to multiple endpoints and process them differently. |
The Content Based Router routes message destinations based on the content of the message.
Content Based Router is described using the choice-when-otherwise clause. choice becomes the parent of routing, and write when clauses under it as many as the number of conditions (that is, if). In the when clause, first describe the condition in the simple clause.
Here is a simple example.
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.endChoice();
For XML DSL, define as follows.
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
</choice>
The when clause results in routing with only one condition. In the simple clause, "$ {header.CamelFileName} ends with'.xml'" is described, and the condition is set that the value of the variable CamelFileName in the header ends with ".xml".
If the conditions are met, the subsequent "
If there are multiple conditions, describe as many when clauses as there are conditions as shown below. The when clause is applied in order from the beginning, and only the first one that meets the conditions is executed.
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.when().simple("${header.CamelFileName} regex '^.*(csv|txt)$'")
.to("seda:csvortxt")
.log("choice csv or txt.")
.endChoice();
For XML DSL, define as follows.
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
<when>
<simple>${header.CamelFileName} regex '^.*(csv|txt)$'</simple>
<log message="choice csv or txt." />
<to uri="seda:csvortxt" />
</when>
</choice>
Finally, let's take a look at the otherwise clause, which handles when none of the conditions are met. Unlike the when clause, the otherwise clause does not have a simple clause because there are no conditions to execute it. The otherwise clause is executed only if it was not processed by all other when clauses.
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.when().simple("${header.CamelFileName} regex '^.*(csv|txt)$'")
.to("seda:csvortxt")
.log("choice csv or txt.")
.otherwise()
.log("choice otherwise.")
.to("seda:end")
.endChoice();
For XML DSL, define as follows.
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
<when>
<simple>${header.CamelFileName} regex '^.*(csv|txt)$'</simple>
<log message="choice csv or txt." />
<to uri="seda:csvortxt" />
</when>
<otherwise>
<log message="choice otherwise." />
<to uri="seda:end" />
<stop />
</otherwise>
</choice>
When executed, the log will be output as shown below.
[2019-02-08 07:40:18.447], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file1.xml
[2019-02-08 07:40:18.470], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice xml.
[2019-02-08 07:40:18.470], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-08 07:40:18.473], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file2.csv
[2019-02-08 07:40:18.476], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice csv or txt.
[2019-02-08 07:40:18.477], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-08 07:40:18.478], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file3.jpg
[2019-02-08 07:40:18.478], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice otherwise.
Splitter allows you to split message Exchange data and process it in parallel. For example, List type data is stored in Exchange and can be processed one by one in parallel.
Please refer to the following article for details.
-Split and process messages with Apache Camel Splitter pattern
Aggregator can aggregate and process the Exchange in the root as opposed to Splitter. For example, it is possible to change one data flowing to the root to one CSV record, aggregate multiple records, and output them to a CSV file.
Please refer to the following article for details.
-Try using Aggregator pattern to aggregate Exchange with Apache Camel
The Throttler pattern allows you to control the flow of data to the destination system (endpoint). For example, send only up to 500 data per second. This allows you to control the other system so that it does not overload.
Please refer to the following article for details.
-Try using Apache Camel flow control (Throttler) pattern
Use the Message Filter pattern to pass incoming messages only if certain conditions are met. Messages that do not meet the conditions will be discarded.
from("file:data/input?noop=true&fileName=file4.txt").id("main_route")
.log("contents: ${body}")
.filter()
.simple("${body} regex '^C.*'")
.to("seda:second")
.end()
.log("main_route end.");
For XML DSL, define as follows.
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file4.txt" />
<log message="contents: ${body}" />
<filter>
<simple>${body} regex '^C.*'</simple>
<to uri="seda:second" />
</filter>
<log message="main_route end." />
</route>
Dynamic Router allows you to implement logic that dynamically routes messages, subject to the data content of the Message Exchange and the endpoint from which it is routed.
Please refer to the following official website for details. (Skip because I have never used it. I may write a commentary article soon)
-Dynamic Router (official site)
Load Balancer allows distributed routing when there are endpoints for multiple destinations.
Please refer to the following official website for details. (I may write a commentary article soon)
-Load Balancer (Official Site)
Multicast calls each endpoint in turn by default. If you execute them in order, it is almost the same as arranging multiple tos, so it is often processed in parallel.
For parallel processing, set the parallelProcessing attribute to "true".
.multicast()
.parallelProcessing(true)
.to("direct:a")
.to("direct:b")
.to("direct:c")
For XML DSL, define as follows.
<multicast parallelProcessing="true">
<to uri="direct:a" />
<to uri="direct:b" />
<to uri="direct:c" />
</multicast>
The message to be multicast is a shallow copy, and the last processed endpoint message is the processing result. In the above example, the message processed by direct: c becomes the final message, and the other direct: a and direct: b messages are discarded. Use the AggregationStrategy to aggregate all messages. Also, if you want to make a deep copy of the message, you need to customize it with the onPrepareRef option.
Multicast continues processing if one of the endpoints throws an exception. For example, if you have three endpoints, one throws an exception and the other two are processed.
If an exception occurs, you can terminate the process and let the Camel error handler handle it. Use the stopOnException option as shown below.
from("timer:trigger?repeatCount=1")
.setBody()
.simple("start: ")
.multicast()
.parallelProcessing(false)
.stopOnException()
.to("direct:a")
.to("direct:b")
.to("direct:c")
.end()
.log("body: ${body}");
For XML DSL, define as follows.
<route>
<from uri="timer:trigges?repeatCount=1" />
<setBody>
<simple>start: </simple>
</setBody>
<multicast parallelProcessing="false" stopOnException="true">
<to uri="direct:a" />
<to uri="direct:b" />
<to uri="direct:c" />
</multicast>
<log message="body: ${body}" />
</route>
However, in the case of parallel processing, it should be noted that if an exception is thrown, the processing will be terminated.
Delayer
You can use Delayer to make the message wait (WAIT). In the following Java DSL, specify "1000" for delay to wait for 1 second (1000 milliseconds).
from("timer:trigger?repeatCount=1")
.log("start.")
.delay()
.constant("1000")
.log("end.");
For XML DSL, define as follows.
<route>
<from uri="timer:trigger?repeatCount=1" />
<log message="start." />
<delay>
<constant>1000</constant>
</delay>
<log message="end." />
</route>
In the above example, the waiting time was specified as a fixed value in the constant clause, but it can also be specified in the simple language. In the following example, DelayIntervalTest = 3000 is specified in the message header to specify the waiting time. If you specify the header dynamically in the processor, you can make each message wait for any time (I don't know if there is such a use case ...).
from("timer:trigger?repeatCount=1")
.setHeader("DelayIntervalTest")
.simple("3000")
.log("start.")
.delay()
.simple("${header.DelayIntervalTest}")
.asyncDelayed()
.log("delay asyncDelayed")
.end()
.log("end.");
For XML DSL, define as follows.
<route>
<from uri="timer:trigger?repeatCount=1" />
<setHeader headerName="DelayIntervalTest">
<simple>3000</simple>
</setHeader>
<delay asyncDelayed="true">
<simple>${header.DelayIntervalTest}</simple>
<log message="delay asyncDelayed" />
</delay>
<log message="end." />
</route>
It also specifies asyncDelayed = "true", which uses the thread pool to process messages asynchronously.
Wire tap
Use the Wire Tap pattern if you don't need a response and want to process the current message asynchronously (in the background) separately from the current route. For example, you can use it if you want to send a message to the backend system separately from the main route.
Please refer to the following article for details.
-Processing asynchronously with the main route using Apache Camel's Wire Tap pattern
Loop
Loop can be iterative.
In the following example, the number of iterations is specified to 5 by the simple clause. Also, the CamelLoopIndex variable in the header contains the current number of iterations, and that value is output to the log.
from("timer:trigger?repeatCount=1")
.loop(5)
.log("CamelLoopIndex: ${header.CamelLoopIndex}");
The execution result is as follows. You can see that it is displayed 5 times from 0 to 4 in the CamelLoopIndex.
[2019-02-08 22:21:16.178], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 0
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 1
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 2
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 3
[2019-02-08 22:21:16.181], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 4
In the above example, the number of repetitions was fixed, but you can also specify a conditional expression like a while statement.
For XML DSL, define as follows.
<route>
<from uri="timer:trigger?repeatCount=1" />
<loop>
<simple>5</simple>
<log message="CamelLoopIndex: ${header.CamelLoopIndex}" />
</loop>
</route>
Sampling throttle allows you to extract samples from traffic passing through the route. If you set a sampling interval (time), only one Exchange in between will be processed. By default, the sampling interval is specified as 1 second.
An example of the sampling throttle route is as follows. The samplePeriod option specified in the sample clause is the sampling interval, and units is the unit of the interval.
In the example below, the sampling interval is 5 seconds, which means that it will be sampled once every 5 seconds.
from("timer:trigger?repeatCount=100")
.setBody()
.simple("${date:now:yyyy-MM-dd HH:mm:ss}")
.sample()
.samplePeriod(5)
.log("Period Body: ${body}");
For XML DSL, define as follows.
<route>
<from uri="timer:trigges?repeatCount=100" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<sample samplePeriod="5" units="SECOND">
<log message="Period Body: ${body}" />
</sample>
</route>
You can also specify sampling by the number of messages instead of intervals with the messageFrequency option. In the example below, the messageFrequency option in the sample clause is set to "10", and one message is sampled for every 10 messages.
SimpleDataSet dataSet = new SimpleDataSet();
dataSet.setSize(100);
SimpleRegistry registry = new SimpleRegistry();
registry.put("testDataSet", dataSet);
CamelContext context = new DefaultCamelContext(registry);
try {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("dataset:testDataSet?initialDelay=3000&produceDelay=0")
.sample()
.sampleMessageFrequency(10)
.log("CamelDataSetIndex ${header.CamelDataSetIndex}");
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
The execution example is as follows, and the CamelDataSetIndex variable in the header is output to the log. You can see that the CamelDataSetIndex variable contains the sequence number of the data and is output to the log every 10 messages such as 9, 19, 29 ....
[2019-02-08 23:07:54.866], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 9
[2019-02-08 23:07:54.867], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 19
[2019-02-08 23:07:54.870], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 29
[2019-02-08 23:07:54.871], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 39
[2019-02-08 23:07:54.872], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 49
For XML DSL, define as follows.
<bean id="testDataSet"
class="org.apache.camel.component.dataset.SimpleDataSet">
<property name="size" value="100" />
<property name="reportCount" value="100" />
</bean>
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="dataset:testDataSet?initialDelay=3000&produceDelay=0" />
<sample messageFrequency="10">
<log message="CamelDataSetIndex ${header.CamelDataSetIndex}" />
</sample>
</route>
</camelContext>
In this entry, I explained the basics of Apache Camel's message routing and the basic concepts of Camel such as routes. Various methods such as conditional branching, division, and aggregation were prepared from the beginning for message routing, and it was possible to use that function by defining a simple route. The route routing component makes message (data) processing easy and efficient.
-Direct Component (official site) -SEDA Component (official site) -Content Based Router (official site) -Throttler (official site) -Message Filter (official site) -Multicast (official site) -Delayer (official site) -Loop (official site) -Sampling Throttler (official site)