Enterprise Integration Zone is brought to you in partnership with:

Enterprise Architect, appointed Mule ESB Champion. Tomas is a DZone MVB and is not an employee of DZone and has posted 4 posts at DZone. You can read more from them at their website. View Full User Profile

Aggregation with Mule – The “Fork and Join Pattern”

09.17.2013
| 3461 views |
  • submit to reddit

In your daily work as an integration developer, you’re working with different kinds of , even if you’re not aware of it.

Since Mule is based on EIP (Enterprise Integration Patterns), you’re most definitely using patterns when using Mule.

One of the patterns that seems to raise a lot of questions is the “fork and join pattern.” The purpose of the fork and join pattern is to send a request to different targets, in parallel, and wait for an aggregated response from all the targets.

This is very useful when you want to merge information from different systems, perform calculations based on information from different targets or compare data from a range of data sources.

Fork and join flowchart

See the flowchart describing the fork and join pattern

Since I’ve noticed questions on the MuleSoft forum and questions from colleagues saying that this is a tricky thing to do, I thought I would show you a simple example of how it’s done.

In my example, there’s an HTTP request coming in that should return the lowest price from different shops. The request is delegated to two different shops, shop1 and shop2, in parallel, (asynchronous), and then it is collected for comparison to return the lowest price. This is easily done with Mule using the request-reply router. Let’s dig in to the configuration…


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:context="http://www.springframework.org/schema/context"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:https="http://www.mulesoft.org/schema/mule/https"
	xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
	xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xsi:schemaLocation="
          http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    	  http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd
          http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/3.2/mule-https.xsd
    	  http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/3.2/mule-xml.xsd
    	  http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd 
    	  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    	  http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
       ">

	<flow name="forkAndJoinFlow">
		<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" />
		<not-filter>
			<wildcard-filter pattern="*favicon*" />
		</not-filter>
		<request-reply>
			<all enableCorrelation="ALWAYS">
				<vm:outbound-endpoint path="shop1"/>
				<vm:outbound-endpoint path="shop2"/>
			</all>
			<vm:inbound-endpoint path="response">
				<message-properties-transformer>
					<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
				</message-properties-transformer>
				<collection-aggregator />
			</vm:inbound-endpoint>
		</request-reply>
		<expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" />
		<object-to-string-transformer/>
		<logger level="WARN" message="#[string:Lowest price: #[payload]]" />
	</flow>
	
	<flow name="shop1Flow">
		<vm:inbound-endpoint path="shop1"/>
		<expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
		<logger level="WARN" message="#[string:Price from shop 1: #[payload]]" />
	</flow>
	
	<flow name="shop2Flow">
		<vm:inbound-endpoint path="shop2" />
		<expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
		<logger level="WARN" message="#[string:Price from shop 2: #[payload]]" />
	</flow>

</mule>

In this example, I’m using the VM transport to simulate communication with external systems, and a random price is returned from the different shops.

Notice that the VM endpoints can be replaced with whatever protocol you desire, and the number of external sources can be as many as you like, just be sure to tell Mule how many results it should wait for by using the property MULE_CORRELATION_GROUP_SIZE.

By opening a browser and entering http://localhost:81/lowestprice, a number is returned.  That's not very useful, but that’s not the point of the example.

When looking at the log the following can be seen:

10:23:10,669  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop1Flow.stage1.02:269 – Price from shop 1: 362
10:23:10,679  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop2Flow.stage1.02:269 – Price from shop 2: 109
10:23:10,729  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].connector.http.mule.default.receiver.02:269 – Lowest price: 109

That tells us that the request has been sent to both shop1 and shop2.  In this case, the lowest price was received from shop2.

It's a very simple example, but I hope you get the idea on how the fork and join pattern can be used and how to use it in more advanced scenarios.


Published at DZone with permission of Tomas Blohm, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)