Enterprise Integration Zone is brought to you in partnership with:

I am the founder and lead developer of Hibernate Envers, a Hibernate core module, which provides entity versioning/auditing capabilities. I am also one of the co-founders of SoftwareMill, a company specializing in delivering customized software solutions (http://softwaremill.com, "Extraordinary software as a standard"), based on Java and JBoss technologies. After work, apart from being involved in development of Envers, I work on several small open source projects, like ElasticMQ (simple message queue written in Scala with an SQS interface), projects around static analysis (using JSR 308 - Typestate Annotations/ Checkers Framework and FindBugs), and some CDI/Weld (not always portable) extensions, like autofactories or stackable security interceptors. I am also interested in new JVM-based languages, especially with functional elements (like Scala, JRuby) and frameworks built using them (like Lift), as well as improving the ways we use Dependency Injection. Adam is a DZone MVB and is not an employee of DZone and has posted 52 posts at DZone. You can read more from them at their website. View Full User Profile

Message Replication in ElasticMQ with JGroups

06.13.2012
| 4159 views |
  • submit to reddit

ElasticMQ is a messaging server, with a Scala, Java, and an Amazon SQS-compatible interface. It supports guaranteed messaging by replicating the messages across a cluster of servers, and message persistence with journalling.

Message replication is one of the core features of ElasticMQ. However if you look at the code, it’s only a handful of classes, the longest having 76 lines (remember that this is Scala, though ;) ). That is because ElasticMQ uses JGroups as the underlying communication library. JGroups is quite old, especially for a Java library – the first release was in 1999 (!). But it’s quite far from being outdated and obsolete – it has a nice API, works without problems, has a good community; and as any Java library cooperates well with Scala.

JGroups has a lot of useful features:

  • reliable multicast
  • cluster management
  • failure detection
  • node discovery
  • many years of performance improvements

which are extensively used for implementing replication in ElasticMQ. Below is a summary of how it’s done.

How does an ElasticMQ cluster work?

In a single ElasticMQ cluster one node is always the master. You can only execute operations against this node. The results of each operation are then replicated to the other members. There are a couple of options related to blocking; the replication can be fully asynchronous, or it can wait until at least one or all nodes ack the operation. To make sure that in case of cluster partitions the same messages aren’t received from different partitions, only the partition which has at least half+1 of the nodes operations is active.

The central concept in ElasticMQ is the message storage. A storage executes commands (such as a send message command, delete message command etc.). The replication layer is just a wrapper around any other storage. Note though, that we can only replicate the resulting storage mutations (so after the command is executed), not the original command itself. For example, if the command is “receive a message”, the results of executing it may be different on each machine. Hence we replicate only the change of the message visibility (in ElasticMQ, similarly to Amazon SQS, if a message is received it is blocked from a subsequent receive for a specified period of time), if receiving a message succeedes. You can see this basic logic in JGroupsReplicatedStorage.

Initializing the cluster

Before we get to the replication itself though, the first thing to do is to initialize the cluster; that is done in ReplicatedStorageConfigurator. As a parameter there, we need a JGroups configuration file, which is a stack of protocols. You don’t really need to know what each of these protocols do and what all those configuration parameters mean. The two most useful are udp.xml and tcp.xml. The first one should be used if you have multicast in your network, the second – if all communication should go through TCP (e.g. on EC2). In the latter case you will also need to provide the list of initial IPs. The list doesn’t have to be exhaustive, just a list of seeds.

Having a protocol stack, ElasticMQ creates a JChannel, and connects it, which simply means connecting to the cluster. And that’s in fact all you need to do to create a cluster with JGroups – pretty simple, right? As you can see at the end of ReplicatedStorageConfigurator, the first thing after connecting is a call to channel.getState(null, 0). This will go to the current master node (more on master election later), fetch the state (current queues and messages) and apply it on the current node (see the very simple JGroupsStateTransferMessageListener – handles both sending and receiving). There are two important things to note here. First is that this transfer doesn’t block the whole cluster from normal operation. Second is that if an operation is executed during the state transfer, it will also be replicated. So it may happen that one command is executed twice on the new node. That doesn’t matter though, as each replicated command is idempotent, so may be applied many times. In other scenarios some application-side mechanism would have to be implemented to prevent such situations.

Replicating data

Finally we get to the core: replicating the commands. On the sender side, this is handled by JGroupsReplicationMessageSender. Again, not a very complicated class. It uses the MessageDispatcher “building block” from JGroups which, apart from multicasing messages across the cluster, enables you to wait until a specified number of nodes receive it. On the receiving side, we have JGroupsRequestHandler. Again, pretty simple. When a message is received it is simply sent to the storage.

Cluster management

You may have also noticed the SetMaster special message. This is needed for the user to be able to read the current master’s node address. Master election (deciding which node is the master) is handled entirely with JGroups. There isn’t a specific algorithm in JGroups to elect a master, however we can use the fact that each node has the same cluster view, which is represented by the JGroups View class. All we need to do is simply get the first (or last, or 3rd, etc. – anything as long as it’s the same on all nodes) element from that list, and set it as the master.

Cluster view is handled by the last “core” replication class, JGroupsMembershipListener. Two things happen there. The viewAccepted callback is called whenever a new node joins or leaves the cluster; each node with the same (well, equal :) ) instance of the View class. The master broadcasts its address (which is the ElasticMQ server address, not the internal JGroups cluster-communication address) in a separate thread. It’s a very easy mistake to perform a blocking operation in one of the JGroups callback methods. You should never do that, as the whole stack can then lock. We also need the FLUSH protocol (which is always added during the cluster setup); this protocol makes sure that no new messages are sent when before the new view has been installed by all nodes, and hence we make sure that a new node always receives the master information.

The membership listener also handles cluster merges. Again, JGroups gives us a view of the partitions that were merged and the new combined view. In ElasticMQ, all partitions except the primary parition (which is the largest one) request a state transfer, just as after connecting to the cluster. That way the data is kept in a consistent state.

Summing up

It’s also worth noting that ElasticMQ’s replication is fully tested using ScalaTest. Each test creates a cluster of in-memory storages, creating new nodes or simulating node crashes. See the JGroupsReplicatedStorageTest class.

Having the mechanisms from JGroups it is pretty straightforward to implement cluster communication. As always, though, you need to remember about some traps when it comes to concurrency (e.g. that there may be cluster activity when a new node joins; that partitions and merges can happen anytime; that there’s no ordering between normal messages and cluster view changes; that messages may be sent during state transfer; etc.). However both the JGroups tutorial and manual is pretty comprehensive and given the additional help from the forums (thanks Bela!), you should be good to go.

You can try how the replication works in practice by downloading a standalone distribution of ElasticMQ or running it embedded.

Adam

Published at DZone with permission of Adam Warski, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)