Bela has posted 1 posts at DZone. View Full User Profile

A Simple Clustered Task Distribution System

  • submit to reddit

The Entry class (inner class of Server) is shown below:
private static class Entry {
private final Task task;
private final Address submitter;
private final Promise<Object> promise=new Promise<Object>();

It is a wrapper for the task, the address of the submitter and the promise (similar to a Future) used to
block on until the result has been received. The address of the submitter of the task is needed to send
the result back. This is necessary when a node other than the originally assigned one takes over and
processes a task.

Another inner class of Server is Request, which is used to send requests and responses between
submitters (masters) and slaves:

public static class Request implements Streamable {
static enum Type {EXECUTE, RESULT, REMOVE};
private Type type;
private Task task;
private ClusterID id;
private Object result;

A Request also implements Streamable (implementation not shown) which allows for more efficient
marshalling. We're sending 3 types of requests around:

1. EXECUTE: multicast by the submitter to all nodes of the cluster. It contains the task and a
ClusterID, generated by the submitter and used by the slave to determine whether or not to
accept the task. Note that only one slave in the entire cluster will handle a given task.

2. RESULT: contains the ClusterID and an Object (can be null if nothing is returned, e.g. calling a
void method). This is unicast from the slave to the master which submitted the task.

3. REMOVE: only contains the ClusterID and is multicast by the submitter of a task T after the
result for T has been received. Everyone removes the task from their cache upon reception of
this message.

Now that we have all the bits and pieces in place, it is actually time to look at the submit() method:

public Object submit(Task task, long timeout) throws Exception {
ClusterID id=ClusterID.create(ch.getLocalAddress());
try {
Request req=new Request(Request.Type.EXECUTE, task, id, null);
byte[] buf=Util.streamableToByteBuffer(req);
Entry entry=new Entry(task, ch.getLocalAddress());
tasks.put(id, entry);
ch.send(new Message(null, null, buf));
return entry.promise.getResultWithTimeout(timeout);
catch(Exception ex) {
tasks.remove(id); // remove it again
throw ex;

This is an implementation of the Master interface. It generates a ClusterId and constructs a Request
instance of type EXECUTE, containing the task and the ClusterID.
Util.streamableToByteBuffer() is a helper method provided by JGroups, which takes a Streamable
object and marshals it into a byte[] buffer. Remember that JGroups can only ship byte[] buffers over the

Then we add the task to our local cache, create a message with destination of null (= multicast) and the
marshalled request.

Finally we block on the promise of Entry until a result has been received or we get an exception (e.g. a

The Slave part for handling of received tasks is simple:

public Object handle(Task task) {
return task.execute();

We simply take the task and call execute() on it.

In the Server.start() method, we created a JChannel and set the Receiver to be the Server itself. We
implement 2 methods: receive(Message) and viewAccepted(View). The receive() method is shown

public void receive(Message msg) {
Request req=(Request)Util.streamableFromByteBuffer(Request.class, msg.getBuffer());
switch(req.type) {
handleExecute(, msg.getSrc(), req.task);
case RESULT:
Entry entry=tasks.get(;
case REMOVE:

Published at DZone with permission of its author, Bela Ban.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)