Enterprise Integration Zone is brought to you in partnership with:

Felipe Gutierrez Cruz. B.S. Computer Science Engineer M.S. Computer Science and Electronic Commerce. Working as Sr. Consultant at GoPivotal/SpringSource, Albuquerque,NM USA Felipe has posted 5 posts at DZone. You can read more from them at their website. View Full User Profile

JMS Clustering by Example

05.26.2010
| 10267 views |
  • submit to reddit

It's amazing how the JBoss Team put together an easy way to do JMS Clustering, out of the box!!.

 I'll start with an easy example, creating a Queue named "MyClusteredQueue".

In this example I'm using JBoss AS 5.1. and two computers connected on the same network, with these IP's:

- Computer A: 192.168.0.143
- Computer B: 192.168.0.210

So, here are the steps:

1) Install the JBoss on both computers. We are going to use the "all" configuration for both computers.

2) We create our Queue on both servers.

Go to $JBOSS_HOME/server/all/deploy/messaging/  and edit the destinations-service.xml file. Add the MyClusteredQueue before the last server tag. It looks like this:

<!-- Cluster JMS -->
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=MyClusteredQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>

 

This is how you add a Queue to the JBoss, and the people how are familiar with this, the only new thing is to add the attribute "Clustered".  This step must be set on both computers. At the end of the article you can find the files.

3) Write the MDB  to consume the messages, and deploy it on the two computers. (I'm using an EJB 3 - MDB style).

 

import java.net.InetAddress;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

/**
* @author felipeg
*
*/
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue")
})
public class JMSClusterClientHandler implements MessageListener {
Logger log = Logger.getLogger(JMSClusterClientHandler.class);

@Override
public void onMessage(Message message) {

try{
if (message instanceof ObjectMessage)
{
InetAddress addr = InetAddress.getLocalHost();
log.info("########## Processing Host: " + addr.getHostName() + " ##########" );

ObjectMessage objMessage = (ObjectMessage) message;
Object obj = objMessage.getObject();

log.info("Object received:" + obj.toString());
}
} catch (Exception e) {
e.printStackTrace();
}

}

}

 

4) Start the jboss with the following options:

Computer A:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.143 -Djboss.messaging.ServerPeerID=1

Computer B:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.210 -Djboss.messaging.ServerPeerID=2


It is necesary to give an ID to each server and this is accomplished with this directive:
-Djboss.messaging.ServerPeerID

When you start the jboss on computer A, you should see the logs (server.log) telling you that there is one node ready and listening, and once you start the jboss on computer B, on the log will appear the two nodes, the two IP's ready to consume messages.

5) Now it's time to send a Message to the Queue. To accomplish this it's necessary to change the connection factory to "ClusteredConnectionFactory" (JMSDispatcher.java - See the code below).
Also on the jndi.properties (if you are using the default InitialContext) file it's necessary to add the two computers ip's separated by comma to the java.naming.provider.url property. (In my case a create a Properties variable and I set all the necessary properties, JMSDispatcher.java - see the code below).

java.naming.provider.url=192.168.0.143:1099,192.168.0.210:1099

The client that I wrote is a web application, that consist in one index.jsp page, which contains a form that prompts you for the name of the queue, the type of messaging (Queue or Topic), the server ip and port, how many times it will send the message and the actual message to be sent; also the web application has a Servlet (JMSClusteredClient.java - see code below) that receives the postback and helper class (JMSDispatcher.java - see code below) that sends the message to the jboss servers.  You can to deploy it in any computer. In my case I deployed it on the Computer A. And you can access it through this URL: http://192.168.0.143:8080/JMSWeb/ (just modify the IP where the client war was deployed).If you notice (on the index.jsp - code below) I've already put some default values that reflects the name of the Queue, and the IP's of my two computers. Now, If you increment the number of times that the message will be sent (maybe a 10) and fill out the message box, and click "Send" you should see on the two servers some of the messages being consumed by the MDB.  Here are the Files to create the client:index.jsp 
<html>
<body>
<div>
<form method="POST" action='<%= request.getRequestURI() + "JMSClusteredClient" %>'>
<fieldset>
<legend>JMS Clustered - Test Client</legend>
<table>
<tr>
<td>Server:</td><td><input type="text" name="server" value="192.168.0.143:1099,192.168.0.210:1099" /></td>
</tr>
<tr>
<td>
<select name="messageType">
<option value="QUEUE" selected="selected">Queue</option>
<option value="TOPIC" >Topic</option>
</select>
</td>
<td><input type="text" name="topicqueue" value="queue/MyClusteredQueue" /></td>
</tr>
<tr>
<td>Times:</td><td><input type="text" name="times" value="3" /></td>
</tr>
<tr>
<td>Message:</td><td><textarea rows="3" cols="20" name="message"></textarea></td>
</tr>
</table>
<input type="submit" value="Send">
</fieldset>
</form>
</div>
</body>
</html>
 Servlet:  JMSClusteredClient.java 
public class JMSClusteredClient extends HttpServlet {
private static final long serialVersionUID = 1L;

/**
* @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response)
*/
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();

String topicqueue = request.getParameter("topicqueue");
String message = request.getParameter("message");
String server = request.getParameter("server");
String messageType = request.getParameter("messageType");
String times = request.getParameter("times");

int intTimes = Integer.parseInt(times);

JMSDispatcher dispatcher = new JMSDispatcher();
dispatcher.setTopicQueueName(topicqueue);
dispatcher.setServer(server);
dispatcher.setMessageType(messageType);

try {

for(int count =1; count <= intTimes;count++){
dispatcher.sendMessage( count + " of " + times + " " + message);
}
out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times.");
} catch (JMSException e) {
e.printStackTrace();
out.println("Error:" + e.getMessage());
} catch (NamingException e) {
out.println("Error:" + e.getMessage());
e.printStackTrace();
} finally{
out.close();
}
}

}
 A utility to send the messages: JMSDispatcher.java 
public class JMSDispatcher {

/**
*
*/
private static final long serialVersionUID = 7105145023422143880L;
private static Logger log = Logger.getLogger(JMSDispatcher.class);


private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory";
private final String CONNECTION_FACTORY = "ConnectionFactory";

private final String TOPIC = "TOPIC";
private final String QUEUE = "QUEUE";

private String topicQueueName;
private String server;
private String messageType;


public void setTopicQueueName(String value){
this.topicQueueName = value;
}

public void setServer(String value){
this.server = value;
}

public void setMessageType(String value){
this.messageType = value;
}

public void sendMessage(Object objectMessage) throws JMSException, NamingException{
log.debug("##### Setting up a Queue/Topic Message: #####");
if (TOPIC.equals(messageType)){
sendTopicMessage(objectMessage);
} else if (QUEUE.equals(messageType)){
sendQueueMessage(objectMessage);
}
log.debug("##### Publishing Message: Done #####");
}


private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{
try{

InitialContext initialContext = getInitialContext();

QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
QueueConnection queueConn = qcf.createQueueConnection();
Queue queue = (Queue) initialContext.lookup(topicQueueName);
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConn.start();

QueueSender send = queueSession.createSender(queue);
ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Queue: " + queueName + "#####");
send.send(om);
send.close();

queueConn.stop();
queueSession.close();
queueConn.close();
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}

}

private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{

try{
InitialContext initialContext = getInitialContext();

TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
TopicConnection topicConn = tcf.createTopicConnection();
Topic topic = (Topic) initialContext.lookup(topicQueueName);
TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
topicConn.start();

TopicPublisher send = topicSession.createPublisher(topic);

ObjectMessage om = topicSession.createObjectMessage();
om.setObject((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Topic: " + topicName + "#####");
send.publish(om);
send.close();

topicConn.stop();
topicSession.close();
topicConn.close();

}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}
}

private InitialContext getInitialContext() throws NamingException{
Properties jboss = new Properties();
jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
jboss.put("java.naming.provider.url", server);
return new InitialContext(jboss);

}
}
 And the web.xml 
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
<display-name>JMSWeb</display-name>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>

<servlet>
<description></description>
<display-name>JMSClusteredClient</display-name>
<servlet-name>JMSClusteredClient</servlet-name>
<servlet-class>com.blogspot.felipeg48.jms.web.JMSClusteredClient</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>JMSClusteredClient</servlet-name>
<url-pattern>/JMSClusteredClient</url-pattern>
</servlet-mapping>
</web-app>
  Happy Clustering!!
Published at DZone with permission of its author, Felipe Gutierrez.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags:

Comments

Luis Carlos Mor... replied on Wed, 2010/06/02 - 2:34pm

I am implementing in OSGI

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.