My name is Gurkan Erdogdu and I am the CTO of the MechSoft Mechanical and Software Solutions. I have been active in the Java and Java EE platform more than 10 years. Strong supporter for the Free and Open Source Software, and actively participating within the Open Source based foundations like, Apache Software Foundation, JBoss, and recently Open Web Foundation. I am the member of the Apache Software Foundation and Open Web Foundation. Writing blog at gurkanerdogdu@blogspot.com. Gurkan has posted 24 posts at DZone. View Full User Profile

Defne Platform New Feature : Asynchronous Messaging Framework

08.12.2010
| 4372 views |
  • submit to reddit

Defne is a lightweight development platform for Java EE web applications. It provides usable libraries to develop Java EE web applications easily and powerful based on Java EE technologies. Every component library of the Defne framework enables developers use them via Dependency Injection, CDI. We will provide built-in IoC container in future 1.0.3 release that will be based on Apache Tomcat and Apache OpenWebBeans.

In this article we are going to delve into the new Defne component, "Defne Asynchronous Messaging", i.e "Defne Channels".

You can download and deploy this exmple into SIwpas, (http://code.google.com/p/siwpas/) application server that provides all nessary libraries for this example. 

Defne Asynchronous Messaging Framework

If you research web on Java asynchronous messaging , you will probably hit millions of documents about Java Messaging Service. It is a standard for developing asynchronous messaging applications in Java. To use JMS, you have to be an expert on some of their internal APIs, like "Sessions","Connections","JNDI Naming","Topics","Queues" etc. There are also some asynhrounous messaging frameworks, like Asynchronous Web Services, CORBA asynchronous communication model, Data Distribution Service etc. Each of them uses its own specific API and programming models. Therefore developers have to learn those techonlogies and their APIs before using them succesfully in any enterprise project.

Our aim with Defne is to provide a simple and common messaging API layer to remove complexity and configuration of those techonlogies. Let's look at the high level architecture of the Defne Asynchronous Framework, i.e Defne Channels

Defne channels high level architecture

Here is the some terminology about the Defne Channel Library

  • Channel :  Channel is an abstract term that sends/receives abstract messages from anywhere. Channels implementations know how to send/recieve message from specific messaging techolnogies.
  • Channel Managers : Channel managers manage the lifecycle of the channels that are registered with them
  • Channel Manager Factory : Factory for getting Channel Managers
  • Channel Listeners : Event/Listener model for channel related events, (initializing, starting, stopping of channels)
  • Channel Manager Listeners : Event/Listener model for channel manager related events, (initializing, starting, stopping of channel managers)
  • Defne Runtime Platform : Provides CDI beans that are used by the developers to inject channel related objects
  • Channel Clients :Users of the Defne Channel Framework

Channel Library Modules
There are two maven modules in Defne Channel Library,

  • Channel Java API  Module : Provides generic APIs for common  messaging operations. One of the important interface is IChannel interface. Parts of this interfaces is provided below.  You can also look at the other interfaces from svn, http://defne.googlecode.com/svn/trunk/defne-channel-api/.
    The other important interfaces are, IChannelManager, IChannelManagerFactory.
    /**
* Initialize the channel. Generally initialization
* will be done by channel managers.
* @param properties properties for proper initialization
* @throws ChannelException for any exception
*/
public void init(Properties properties) throws ChannelException;

/**
* Starts the channel.
* @throws ChannelException for exception occurs
*/
public void start() throws ChannelException;

/**
* Stops the channel.
* <p>
* Any exception is logged and swallowed.
* </p>
*/
public void stop();

/**
* Gets channel state.
* @return the state of the channel.
*/
public ChannelState getState();

JSF Example Uses Defne Channel Library
In this article, we will look at JSF application that uses Defne Channel Components. This example uses JMS as an implementation of Channel APIs. There are two pages of this applications

  • Sender  Page and Implementation : Create a channel and send message to channel(Based on JMS topic) 
  • Receiver  Page and Implementation : Subscribe to channel to receive message from this channel

You can look at full project source code from SVN, http://defne.googlecode.com/svn/trunk/defne-samples/channel-sample/

Here is the Sender.java class,

@SessionScoped
@Named
public class Sender implements Serializable
{
private static final long serialVersionUID = 1L;

/**Message that will be sent*/
private String message;

/**Channel manager factory*/
private transient @Inject @JmsFactory IChannelManagerFactory channelManagerFactory;

/**Channel manager*/
private IChannelManager channelManager;

/**Our channel that we will send message*/
private IChannel channel;

/**
* Initialize our channel manager and start.
*/
@PostConstruct
public void init()
{
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put(Context.PROVIDER_URL, "tcp://localhost:61616?connectionTimeout=0");

properties.put(IJmsChannelManager.CONNECTION_FACTORY_JNDI_NAME, "ConnectionFactory");

try
{
//Our JMS Channel Manager
this.channelManager = channelManagerFactory.getChannelManager(properties);

JmsChannelConfig jmsChannelConfig = new JmsChannelConfig(JmsChannelType.TOPIC);
this.channel = this.channelManager.createChannel(jmsChannelConfig);

//Setting channel JNDI Name, using ActiveMQ dynamic topic
properties.put(IJmsChannel.JMS_CHANNEL_DESTINATION_JNDI_NAME, "dynamicTopics/A");

//Register our channel with manager, pass null that
//Manager creates unique id for our channel
this.channelManager.registerChannel(null, channel);

//Initialize and start our channels in manager
this.channelManager.init(properties);
this.channelManager.start();
}
catch (ChannelException e)
{
throw new RuntimeException(e);
}
}

/**
* Stop our channel manager.
*/
@PreDestroy
public void preDestroy()
{
this.channelManager.stop();
}

/**
* Send a message.
* @return null for same page
*/
public String sendMessage()
{
try
{
this.channel.sendMessage(message);
}
catch (ChannelException e)
{
e.printStackTrace();
}

return null;
}

public String getMessage()
{
return message;
}

public void setMessage(String message)
{
this.message = message;
}

}

 important parts of this class,

  • It injects "IChannelManagerFactory" : It uses "@JmsFactory" qualifier. Therefore, Defne finds factory class that is capable of producing the JMS Channel Managers. 
  • Setting up channel, channel manager and start channel manager : In @PostConstruct method, we create and register our channel, initialize and start channel manager. Each channel has an unique id. If id is not given at creation time, channel manager will produce an unique id for the channel. Here, we use "this.channelManager.createChannel(jmsChannelConfig)" to create JMS topic based channel. You can give more properties to configure your jms channel, look at "IJmsChannel" interface for other properties. For example, it uses "IJmsChannel.JMS_CHANNEL_DESTINATION_JNDI_NAME, "dynamicTopics/A" implies that this channel JNDI name is "dynamicTopics/A".
  • Sending Message : Message is sent by simply call the "this.channel.sendMessage(message)"
  • Stopping the channel manager : "this.channelManager.stop()" stops all registered channels and does cleanup operations.

Here is the GUI for sender,

When user clicks send button, it will send input message to channel.

Here is the Receiver.java class,

@SessionScoped
@Named
public class Receiver implements Serializable, IChannelMsgListener
{
private static final long serialVersionUID = 1L;

/**Will be uSed for getting JMS Channel Manager*/
private transient @Inject @JmsFactory IChannelManagerFactory channelManagerFactory;

/**Our channel manager*/
private IChannelManager channelManager;

/**Incoming message*/
private String message;

/**Notification for user*/
private String result = "";

/**
* Initialize our manager and start.
*/
protected void init()
{
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put(Context.PROVIDER_URL, "tcp://localhost:61616?connectionTimeout=0");

properties.put(IJmsChannelManager.CONNECTION_FACTORY_JNDI_NAME, "ConnectionFactory");

try
{
//Our JMS Channel Manager
this.channelManager = channelManagerFactory.getChannelManager(properties);

JmsChannelConfig jmsChannelConfig = new JmsChannelConfig(JmsChannelType.TOPIC);
IChannel topicChannel = this.channelManager.createChannel(jmsChannelConfig, this);

//Setting channel JNDI Name, using ActiveMQ dynamic topic
properties.put(IJmsChannel.JMS_CHANNEL_DESTINATION_JNDI_NAME, "dynamicTopics/A");

//Register our channel with manager, pass null that
//Manager creates unique id for our channel
this.channelManager.registerChannel("MyChannel", topicChannel);

//Initialize and start our channels in manager
this.channelManager.init(properties);
}
catch (ChannelException e)
{
throw new RuntimeException(e);
}
}

/**
* Start to receive.
* @return null
*/
public String start()
{
try
{
init();
this.channelManager.start();
this.result = "Channel has opened";
}
catch (ChannelException e)
{
e.printStackTrace();
}

return null;
}

/**
* Destroy and stops our manager.
*/
public String stop()
{
this.channelManager.stop();
this.result = "Channel has closed";
return null;
}

public String getMessage()
{
return this.message;
}

/**
* New message arrives.
*/
@Override
public void onChannelMessage(ChannelMsgEvent message)
{
ChannelJmsMsgEvent jmsEvent = (ChannelJmsMsgEvent)message;

try
{
this.message = jmsEvent.readTextMessagePayload();
}
catch (ChannelException e)
{
e.printStackTrace();
}
}

public String getResult()
{
return result;
}

public void setResult(String result)
{
this.result = result;
}

}

Receiver configures its channel like Sender. But it also subscribes to channel with implementing "IChannelMsgListener". Whenever a new message comes to "channel", it receives the message. " public void onChannelMessage(ChannelMsgEvent message)" is called by Defne framrwork at runtime when a new message is ready.

Here is the GUI for receiver.

  • When user clicks "Open Channel" button, channel is started and subscription will be done
  • When user clicks "Close Channel" button, channel manager is stopped
  • When user clicks, "Receive Last Message", user gets latest message that has been sent by the sender.

All of those buttons use JSF 2 Ajax communication :) 

Future of the Framework

We will add some more features to our JMS implementation, like adding interceptors, executing Defne services etc. We will also look for other messaging techonlogy implementations. We will also provide COMET implementation based on Channel Library using Servlet 3.0 Asynch features using Bayeux Protocol. Stay tune!

Standalone Usage

You can also use Defne Channel Component library in your standalone projects. If you want to use Defne Channel library in an environments where dependency injection is not available, you will instantiate the objects yourself. For example, here is the test for Defne Channel code,

        Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put(Context.PROVIDER_URL, "vm://localhost?broker.persistent=false");

JmsChannelManagerFactory factory = new JmsChannelManagerFactory();
IJmsChannelManager channelManager = factory.getChannelManager(properties);

//Create channel
IChannel channel = channelManager.createChannel(new JmsChannelConfig(JmsChannelType.TOPIC), this, this);

//Register our channel
channelManager.registerChannel(channelManager.producerChannelId(), channel);

//Adding Topic Jndi Name
properties.put(IJmsChannel.JMS_CHANNEL_DESTINATION_JNDI_NAME, "dynamicTopics/A");

//add listener to channel manager
channelManager.addChannelManagerListener(this);

//Initialize our manager
channelManager.init(properties);

//Lets start'er channel manager
channelManager.start();

JmsUtil.sendTextMessage("dynamicTopics/A", new InitialContext(properties), "Hello World");

Thread.sleep(5000);

//Stop manager
channelManager.stop();

 

Enjoy!

Defne URLs

Web Site : http://code.google.com/p/defne/

SVN Site : http://code.google.com/p/defne/source/browse/

Maven Snapshot : https://oss.sonatype.org/content/repositories/snapshots/org/defne/

Maven : http://repo1.maven.org/maven2/org/defne/

Defne Discussion : http://groups.google.com/group/defnedev

 

Gurkan Erdogdu

ASF Member,http://apache.org

PMC Chair, Apache OpenWebBeans

CTO, MechSoft Mechanical and Software Solutions, http://www.mechsoft.com.tr

 

Published at DZone with permission of its author, Gurkan Erdogdu.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags: