Spring Integration and Apache Camel
Solution Using Spring Integration:
The Gateway component is easily configured using the following entry in the Spring Configuration. Internally Spring Integration uses AOP to hook up a component which routes the requests from an internal input channel and waits for the response in the response channel.
<si:gateway default-reply-channel="exit" default-request-channel="enter" id="reportGenerator" service-interface="org.bk.report.ReportGenerator">The component to Split the Input Report Request to Section Request is fairly straightforward:
</si:gateway>
public class SectionRequestSplitter {
public List split(ReportRequest reportRequest){
return reportRequest.getSectionRequests();
}
}
and to hook this splitter with Spring Integration:
<bean class="org.bk.report.common.SectionRequestSplitter" id="sectionRequestSplitterBean">Next, to transform the Section Request to an XML format - The component is the following:
<si:splitter id="sectionRequestSplitter" input-channel="enter" method="split" output-channel="sectionRequestToXMLChannel" ref="sectionRequestSplitterBean">
</si:splitter></bean>
public class SectionRequestToXMLTransformer {
public String transform(SectionRequest sectionRequest){
//this needs to be optimized...purely for demonstration of the concept
String sectionRequestAsString = "<section><meta><entityId>" + sectionRequest.getEntityId()
+ "</entityId><sectionName>" + sectionRequest.getSectionId()
+ "</sectionName></meta></section>";
return sectionRequestAsString;
}
}
and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionRequestToXMLBean" class="org.bk.report.common.SectionRequestToXMLTransformer"/>To send an XML over http request using the Section Request XML to a section Service:
<si:transformer input-channel="sectionRequestToXMLChannel" ref="sectionRequestToXMLBean" method="transform" output-channel="sectionRequestChannel"/>
<http:outbound-gateway id="httpHeaderGateway"To transform the Section Response XML to a Section Object - The component is the following:
request-channel="sectionRequestChannel" reply-channel="sectionResponseChannel"
default-url="${sectionBaseURL}/section" extract-request-payload="true"
charset="UTF-8" request-timeout="1200" />
public class SectionResponseXMLToSectionTransformer {
public Section transform(String sectionXML) {
SAXReader saxReader = new SAXReader();
Document document;
String sectionName = "";
String entityId = "";
try {
document = saxReader.read(new StringReader(sectionXML));
sectionName = document
.selectSingleNode("/section/meta/sectionName").getText();
entityId = document.selectSingleNode("/section/meta/entityId")
.getText();
} catch (DocumentException e) {
e.printStackTrace();
}
return new Section(entityId, sectionName, sectionXML);
}
}
and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionResponseXMLToSectionBean" class="org.bk.report.common.SectionResponseXMLToSectionTransformer" />To aggregate the Sections together into a report, the component is the following::
<si:transformer input-channel="sectionXMLResponseChannel"
ref="sectionResponseXMLToSectionBean" method="transform" output-channel="sectionResponseChannel" />
public class SectionResponseAggregator {
public Report aggregate(List<Section> sections) {
return new Report(sections);
}
}
and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionResponseAggregator" class="org.bk.report.common.SectionResponseAggregator"/>
<si:aggregator input-channel="sectionResponseChannel" output-channel="exit" ref="sectionResponseAggregator" method="aggregate"/>
This completes the Spring Integration implementation for this Integration Problem. The following is the complete Spring Integration configuration file:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:si="http://www.springframework.org/schema/integration"
xmlns:http="http://www.springframework.org/schema/integration/http"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-1.0.xsd
">
<context:property-placeholder />
<si:channel id="enter" />
<si:channel id="exit" />
<si:gateway id="reportGenerator" default-request-channel="enter"
default-reply-channel="exit" service-interface="org.bk.report.ReportGenerator" />
<si:channel id="sectionRequestToXMLChannel" />
<si:splitter id="sectionRequestSplitter" input-channel="enter"
ref="sectionRequestSplitterBean" method="split" output-channel="sectionRequestToXMLChannel" />
<si:channel id="sectionRequestChannel" />
<si:transformer input-channel="sectionRequestToXMLChannel"
ref="sectionRequestToXMLBean" method="transform" output-channel="sectionRequestChannel" />
<si:channel id="sectionXMLResponseChannel" />
<http:outbound-gateway id="httpHeaderGateway"
request-channel="sectionRequestChannel" reply-channel="sectionXMLResponseChannel"
default-url="${sectionBaseURL}/section" extract-request-payload="true"
charset="UTF-8" request-timeout="1200" />
<si:channel id="sectionResponseChannel" />
<si:transformer input-channel="sectionXMLResponseChannel"
ref="sectionResponseXMLToSectionBean" method="transform" output-channel="sectionResponseChannel" />
<si:aggregator input-channel="sectionResponseChannel"
output-channel="exit" ref="sectionResponseAggregator" method="aggregate" />
<bean id="sectionRequestSplitterBean" class="org.bk.report.common.SectionRequestSplitter" />
<bean id="sectionRequestToXMLBean" class="org.bk.report.common.SectionRequestToXMLTransformer" />
<bean id="sectionResponseXMLToSectionBean" class="org.bk.report.common.SectionResponseXMLToSectionTransformer" />
<bean id="sectionResponseAggregator" class="org.bk.report.common.SectionResponseAggregator" />
</beans>
A working sample is provided with the article(Download, extract and run "mvn test")
(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)





Comments
Stefan Krause replied on Fri, 2010/01/01 - 1:11pm
Claus Ibsen replied on Sun, 2010/01/03 - 7:46am
Some design ideas was sketched here: overhaul of Aggregator.
You can however currently create your own persistent capable implementation of org.apache.camel.processor.aggregate.AggregationCollection and use your implementation in the Aggregator EIP. Some end users have done that.
Claus Ibsen replied on Sun, 2010/01/03 - 8:02am
Camel do support Messaging Gateway EIP which is documented here. The documentation do need to be improved with some better samples.
In your case you can for example use the @Produce annotation to inject a messaging gateway to easily send in the message to Camel.
This makes the code nice, simple and easy as shown below:
public class CamelReportGenerator implements ReportGenerator { @Produce(uri = "direct:start") private ReportGenerator generator; public Report generateReport(ReportRequest reportRequest) { return generator.generateReport(reportRequest); } }However there is an issue in Camel (v2.1 or older) when invoking a bean later in the route which you do when calling the bean method in the splitter EIP. To prevent this problem you need to add to use the convertBodyTo as shown below:
@Override public void configure() throws Exception { from("direct:start") .convertBodyTo(ReportRequest.class) .split(bean("sectionRequestSplitterBean", "split"), new ReportAggregationStrategy()) .transform().method("sectionRequestToXMLBean", "transform") .to(serviceURL) .transform().method("sectionResponseXMLToSectionBean", "transform") .end(); }This will be fixed in Camel 2.2 ticket CAMEL-2325.
Instead of using @Produce you can use the CamelProxy which allows you to proxy a POJO and use that POJO as the messaging gateway. You can define the proxy in the Spring XML directly as shown in the documentation.
Biju Kunjummen replied on Sun, 2010/01/03 - 9:01am
in response to:
Claus Ibsen
@Override public void configure() throws Exception { from("direct:start") .convertBodyTo(ReportRequest.class) .split(bean("sectionRequestSplitterBean", "split"), new ReportAggregationStrategy()) .transform().method("sectionRequestToXMLBean", "transform") .to(serviceURL) .transform().method("sectionResponseXMLToSectionBean", "transform") .end(); }The .end() is giving a compilation error. I am assuming that this is a bug with Apache Camel 2.1.0. It works without that, so for now I will include the code without the final .end().Claus Ibsen replied on Sun, 2010/01/03 - 11:20am
The final *end()* is optional as the route ends anyway. Its only needed if you need to do some additional routing after all the messages has been aggregated again (eg. scatter-gather is done). But in this case the message should be returned to the original caller = the service gateway.
The compilation error. Yeah we have stretched as far we can do with regular Java. It certainly got its limitations when trying to mimic and build a DSL.
Kai Wähner replied on Wed, 2012/01/11 - 4:10am
I just wrote an article, which explains my experiences with Apache Camel and its alternatives Spring Integration and Mule ESB:
Spoilt for Choice: Which Integration Framework to use - Spring Integration, Mule ESB or Apache Camel?
Best regards,
Kai Wähner (Twitter: @KaiWaehner)
Oleg Zhurakousky replied on Wed, 2012/03/14 - 11:20am
in response to:
Claus Ibsen