Enterprise Integration Zone is brought to you in partnership with:

Java architect with UST Global Biju has posted 5 posts at DZone. View Full User Profile

Spring Integration and Apache Camel

12.31.2009
| 64351 views |
  • submit to reddit

Solution Using Spring Integration:

The Gateway component is easily configured using the following entry in the Spring Configuration. Internally Spring Integration uses AOP to hook up a component which routes the requests from an internal input channel and waits for the response in the response channel.

<si:gateway default-reply-channel="exit" default-request-channel="enter" id="reportGenerator" service-interface="org.bk.report.ReportGenerator">
</si:gateway>

The component to Split the Input Report Request to Section Request is fairly straightforward:
public class SectionRequestSplitter {    
public List split(ReportRequest reportRequest){
return reportRequest.getSectionRequests();
}

}
and to hook this splitter with Spring Integration:
<bean class="org.bk.report.common.SectionRequestSplitter" id="sectionRequestSplitterBean">

<si:splitter id="sectionRequestSplitter" input-channel="enter" method="split" output-channel="sectionRequestToXMLChannel" ref="sectionRequestSplitterBean">

</si:splitter></bean>
Next, to transform the Section Request to an XML format - The component is the following:
public class SectionRequestToXMLTransformer {
public String transform(SectionRequest sectionRequest){
//this needs to be optimized...purely for demonstration of the concept
String sectionRequestAsString = "<section><meta><entityId>" + sectionRequest.getEntityId()
+ "</entityId><sectionName>" + sectionRequest.getSectionId()
+ "</sectionName></meta></section>";

return sectionRequestAsString;

}
}
and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionRequestToXMLBean" class="org.bk.report.common.SectionRequestToXMLTransformer"/>
<si:transformer input-channel="sectionRequestToXMLChannel" ref="sectionRequestToXMLBean" method="transform" output-channel="sectionRequestChannel"/>
To send an XML over http request using the Section Request XML to a section Service:
<http:outbound-gateway id="httpHeaderGateway"
request-channel="sectionRequestChannel" reply-channel="sectionResponseChannel"
default-url="${sectionBaseURL}/section" extract-request-payload="true"
charset="UTF-8" request-timeout="1200" />

To transform the Section Response XML to a Section Object - The component is the following:
public class SectionResponseXMLToSectionTransformer {
public Section transform(String sectionXML) {
SAXReader saxReader = new SAXReader();
Document document;
String sectionName = "";
String entityId = "";
try {
document = saxReader.read(new StringReader(sectionXML));

sectionName = document
.selectSingleNode("/section/meta/sectionName").getText();
entityId = document.selectSingleNode("/section/meta/entityId")
.getText();
} catch (DocumentException e) {
e.printStackTrace();
}
return new Section(entityId, sectionName, sectionXML);
}
}
and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionResponseXMLToSectionBean" class="org.bk.report.common.SectionResponseXMLToSectionTransformer" />
<si:transformer input-channel="sectionXMLResponseChannel"
ref="sectionResponseXMLToSectionBean" method="transform" output-channel="sectionResponseChannel" />


To aggregate the Sections together into a report, the component is the following::
public class SectionResponseAggregator {

public Report aggregate(List<Section> sections) {
return new Report(sections);
}

}

and is hooked up in the Spring Integration configuration file in the following way:
<bean id="sectionResponseAggregator" class="org.bk.report.common.SectionResponseAggregator"/>
<si:aggregator input-channel="sectionResponseChannel" output-channel="exit" ref="sectionResponseAggregator" method="aggregate"/>

This completes the Spring Integration implementation for this Integration Problem. The following is the complete Spring Integration configuration file:

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:si="http://www.springframework.org/schema/integration"
xmlns:http="http://www.springframework.org/schema/integration/http"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-1.0.xsd
">

<context:property-placeholder />
<si:channel id="enter" />

<si:channel id="exit" />

<si:gateway id="reportGenerator" default-request-channel="enter"
default-reply-channel="exit" service-interface="org.bk.report.ReportGenerator" />

<si:channel id="sectionRequestToXMLChannel" />
<si:splitter id="sectionRequestSplitter" input-channel="enter"
ref="sectionRequestSplitterBean" method="split" output-channel="sectionRequestToXMLChannel" />

<si:channel id="sectionRequestChannel" />
<si:transformer input-channel="sectionRequestToXMLChannel"
ref="sectionRequestToXMLBean" method="transform" output-channel="sectionRequestChannel" />

<si:channel id="sectionXMLResponseChannel" />
<http:outbound-gateway id="httpHeaderGateway"
request-channel="sectionRequestChannel" reply-channel="sectionXMLResponseChannel"
default-url="${sectionBaseURL}/section" extract-request-payload="true"
charset="UTF-8" request-timeout="1200" />

<si:channel id="sectionResponseChannel" />
<si:transformer input-channel="sectionXMLResponseChannel"
ref="sectionResponseXMLToSectionBean" method="transform" output-channel="sectionResponseChannel" />

<si:aggregator input-channel="sectionResponseChannel"
output-channel="exit" ref="sectionResponseAggregator" method="aggregate" />

<bean id="sectionRequestSplitterBean" class="org.bk.report.common.SectionRequestSplitter" />
<bean id="sectionRequestToXMLBean" class="org.bk.report.common.SectionRequestToXMLTransformer" />
<bean id="sectionResponseXMLToSectionBean" class="org.bk.report.common.SectionResponseXMLToSectionTransformer" />
<bean id="sectionResponseAggregator" class="org.bk.report.common.SectionResponseAggregator" />

</beans>

 

A working sample is provided with the article(Download, extract and run "mvn test")

Legacy
Published at DZone with permission of its author, Biju Kunjummen.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Stefan Krause replied on Fri, 2010/01/01 - 1:11pm

Thanks for the nice article. Do you know if either of the aggregators supports persistent messages? Considering the much touted delivery guarantee for messaging systems this would look like a very basic requirement.

Claus Ibsen replied on Sun, 2010/01/03 - 7:46am

@Stefan Camel does currently not provide a persistent store for the Aggregator EIP. Its on the roadmap though, for Camel 2.3: CAMEL-217. We have planned an overhaul of the Aggregator EIP which should also make it easier to use out of the box. For example the need to use the low level AggregationStrategy with a POJO of some sort is also desired.

Some design ideas was sketched here: overhaul of Aggregator.

You can however currently create your own persistent capable implementation of org.apache.camel.processor.aggregate.AggregationCollection and use your implementation in the Aggregator EIP. Some end users have done that.

Claus Ibsen replied on Sun, 2010/01/03 - 8:02am

Comments for page 3:
Camel do support Messaging Gateway EIP which is documented here. The documentation do need to be improved with some better samples.

In your case you can for example use the @Produce annotation to inject a messaging gateway to easily send in the message to Camel.

This makes the code nice, simple and easy as shown below:

public class CamelReportGenerator implements ReportGenerator {

    @Produce(uri = "direct:start")
    private ReportGenerator generator;

    public Report generateReport(ReportRequest reportRequest) {
        return generator.generateReport(reportRequest);
    }

}


However there is an issue in Camel (v2.1 or older) when invoking a bean later in the route which you do when calling the bean method in the splitter EIP. To prevent this problem you need to add to use the convertBodyTo as shown below:

	@Override
	public void configure() throws Exception {
            from("direct:start")
            .convertBodyTo(ReportRequest.class)
            .split(bean("sectionRequestSplitterBean", "split"), new ReportAggregationStrategy())
    		    .transform().method("sectionRequestToXMLBean", "transform")
	    	    .to(serviceURL)
	            .transform().method("sectionResponseXMLToSectionBean", "transform")
            .end();
	}

This will be fixed in Camel 2.2 ticket CAMEL-2325.

Instead of using @Produce you can use the CamelProxy which allows you to proxy a POJO and use that POJO as the messaging gateway. You can define the proxy in the Spring XML directly as shown in the documentation.

Biju Kunjummen replied on Sun, 2010/01/03 - 9:01am in response to: Claus Ibsen

Thanks for your feedback davsclaus. CamelProxy is what I should have mentioned in the article. I will update the article shortly. One feedback on the the code that you have provided:
	@Override
	public void configure() throws Exception {
            from("direct:start")
            .convertBodyTo(ReportRequest.class)
            .split(bean("sectionRequestSplitterBean", "split"), new ReportAggregationStrategy())
    		    .transform().method("sectionRequestToXMLBean", "transform")
	    	    .to(serviceURL)
	            .transform().method("sectionResponseXMLToSectionBean", "transform")
            .end();
	}
The .end() is giving a compilation error. I am assuming that this is a bug with Apache Camel 2.1.0. It works without that, so for now I will include the code without the final .end().

Claus Ibsen replied on Sun, 2010/01/03 - 11:20am

Ah cool

The final *end()* is optional as the route ends anyway. Its only needed if you need to do some additional routing after all the messages has been aggregated again (eg. scatter-gather is done). But in this case the message should be returned to the original caller = the service gateway.

The compilation error. Yeah we have stretched as far we can do with regular Java. It certainly got its limitations when trying to mimic and build a DSL.

Kai Wähner replied on Wed, 2012/01/11 - 4:10am

I just wrote an article, which explains my experiences with Apache Camel and its alternatives Spring Integration and Mule ESB:

Spoilt for Choice: Which Integration Framework to use - Spring Integration, Mule ESB or Apache Camel?


Best regards,
Kai Wähner (Twitter: @KaiWaehner)

Oleg Zhurakousky replied on Wed, 2012/03/14 - 11:20am in response to: Claus Ibsen

@Stephan Spring Integration has always provided persistent storage option for both Aggregator and Resequencer via 'message-store' option. You can read more here: http://static.springsource.org/spring-integration/reference/htmlsingle/#aggregator

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.