Enterprise Integration Zone is brought to you in partnership with:

Torsten works for Red Hat and is contributing to various Open Source projects. Torsten is a DZone MVB and is not an employee of DZone and has posted 4 posts at DZone. You can read more from them at their website. View Full User Profile

A JMS Connection Leak Scenario in Camel

08.26.2013
| 2397 views |
  • submit to reddit

I came across this interesting problem today and want to capture it.  Suppose this simple Camel route, consuming from ActiveMQ and logging the message:

<camelcontext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
  <route id="jms-consumer4">
    <from uri="amq:queue:Test" />
    <to uri="log:JMSConsumer?level=INFO&showBody=true" />
  </route>
</camelcontext>
 
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq">
  <property name="connectionFactory" ref="singleCF" />
  <property name="useSingleConnection" value="true" />
  <property name="usePooledConnection" value="false" />
  <property name="preserveMessageQos" value="true" />
</bean>
 
<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleCF">
  <property name="targetConnectionFactory" ref="AMQCF" />
  <property name="reconnectOnException" value="true" />
</bean>
 
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF">
  <property name="userName" value="admin" />
  <property name="password" value="admin" />
  <property name="brokerURL" value="tcp://localhost:61616" />
  <property name="copyMessageOnSend" value="false" />
  <property name="useAsyncSend" value="true" />
</bean>

After deploying this to ServiceMix (JBoss Fuse 6.0 in my test) it works just fine. The problem occurs when stopping this Camel route (via an osgi:stop of the corresponding bundle). The connection into the ActiveMQ broker does not get closed! Drilling into the JMX view of the broker the connection is still registered in JMX. Even worse each restart and stop of the Camel route leaks another connection.

I did a bit of root cause analysis and found:

When the Camel route is stopped it calls into SingleConnectionFactory.destroy(). This cleans up the internally held ActiveMQConnection. At this stage the connection is properly removed from the broker's JMX view, which you will probably only notice when debugging through the code.
However the Spring JMS listener used by the Camel JMS consumer is still running and it detects that the connection is down and tries to transparently reconnect. This calls into SingleConnectionFactory.createConnection again (full stack trace below [1]). The SingleConnectionFactory does happily reopen a new connection into the broker which then remains registered in JMX and open although the route gets stopped.

So how do you resolve this?

Instead of using Springs SingleConnectionFactory I recommend to use ActiveMQs PooledConnectionFactory instead. So the above Camel route configuration becomes
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq">
  <property name="connectionFactory" ref="pooledCF" />
  <property name="useSingleConnection" value="true" />
  <property name="usePooledConnection" value="false" />
  <property name="preserveMessageQos" value="true" />
</bean>
 
<bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" id="pooledCF" init-method="start">
  <property name="connectionFactory" ref="AMQCF" />
  <property name="maxConnections" value="1" />
</bean>
 
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF">
  <property name="userName" value="admin" />
  <property name="password" value="admin" />
  <property name="brokerURL" value="tcp://localhost:61616" />
  <property name="copyMessageOnSend" value="false" />
  <property name="useAsyncSend" value="true" />
</bean>

When using the ActiveMQs PooledConnectionFactory instead, things behave pretty much the same with one subtle but important difference.

Similar to above, stopping the Camel bundle calls into PooledConnectionFactory.stop(). This internally closes all ActiveMQConnections (only one in this example case but potentially more) which also unregisters the connection from the brokers JMX view. Now, Springs JMS listener used by the Camel JMS consumer is still running and detects the connection closure and tries to transparently reconnect. This calls into PooledConnectionFactory.createConnection(). This implementation however contains the following check:

if (stopped.get()) {
  LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
  return null;
}

AtomicBoolean stopped will be set to true and no new connection is established!

Springs SingleConnectionFactory does not have this logic. It happily reopens a new connection after it got destroyed. Please note the properties init-method="start" destroy-method="stop" on the PooledConnectionFactory bean definition are important as otherwise you may also leak connections when shutting down your bundles.

[1]
Daemon Thread [Camel (camelContext) thread #21 - JmsConsumer[EwdTest1]]
(Suspended (breakpoint at line 280 in org.springframework.jms.connection.SingleConnectionFactory))
owns: java.lang.Object (id=9254)
owns: java.lang.Object (id=9255)
owns: java.lang.Object (id=9286)
org.springframework.jms.connection.SingleConnectionFactory.initConnection() line: 280
org.springframework.jms.connection.SingleConnectionFactory.createConnection() line: 225
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.support.JmsAccessor).createConnection() line: 184
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).createSharedConnection() line: 404
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).refreshSharedConnection() line: 389
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).refreshConnectionUntilSuccessful() line: 869
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).recoverAfterListenerSetupFailure() line: 851
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 982
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(java.lang.Runnable) line: 895
java.util.concurrent.ThreadPoolExecutor$Worker.run() line: 918
java.lang.Thread.run() line: 680


Published at DZone with permission of Torsten Mielke, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)