Bela has posted 1 posts at DZone. View Full User Profile

A Simple Clustered Task Distribution System

  • submit to reddit


Let's look at how we implement this4. We'll write a Server class which has the main() loop and accepts
requests which are to be distributed across the cluster. This mimics real clients sending requests to any
cluster node. We can then start a number of Server processes, across the cluster or on the same host (for
the demo).

First, we need an ID (ClusterID) which is unique across the cluster and which is used to determine
whether a node accepts a task. An instance of ClusterID is created for every task. The class looks as

public class ClusterID implements Streamable {
private Address creator;
private int id;
private static int next_id=1;
public static synchronized ClusterID create(Address addr) {
return new ClusterID(addr, next_id++);

Implementations of Streamable let JGroups marshal and unmarshal objects more efficiently.

A ClusterID has the address of the node which created it and an ID which is incremented on each
create() call. If we only used IDs, because every node could potentially submit tasks, we might end up
with node A submitting task #23 and node C submitting task #23, and this would lead to issues with
task entries getting overwritten in the cache hash map Prefixing the ID with its creator would yield
A::23 and C::23, which are 2 different tasks.

Then, we define the Master and Slaves interfaces:

public interface Master {
Object submit(Task task, long timeout) throws Exception;
public interface Slave {
Object handle(Task task);

These interfaces are implemented by our Server class which does the bulk of the work.

The submit() method takes a task (see below) and a timeout. It can throw an exception or return a
result. Note that both the Task subclasses and the result have to be serializable or Streamable, because
they are potentially sent across the network to get executed.

The handle(Task t) method is invoked at the slave, which is the worker node which decides that it will
handle the task. It typically uses the data shipped with a task (subclass) and returns an object which
needs to be serializable because in most cases, it will be sent back to the submitter via the network.
Next, we define the Task:

public interface Task extends Serializable {
public abstract Object execute();

A task contains all of the necessary data that is shipped to a slave. The execute() method then uses that
data to perform the processing and returns a result which is sent back to the master who submitted that

Now that we've defined all the ancillary classes and interfaces, let's start writing the Server:

public class Server extends ReceiverAdapter implements Master, Slave {
private String props="udp.xml";
private Channel ch;
private final ConcurrentMap<ClusterID,Entry> tasks;
private final ExecutorService thread_pool=Executors.newCachedThreadPool();
private View view;
private int rank=-1;
private int cluster_size=-1;
public void start() throws Exception {
ch=new JChannel(props);
public void stop() {

Server implements the Master and Slave interfaces, which means that a Server can act both as a client
(Master) and server (Slave). So the name “Server” is actually somewhat of a misnomer, as this is
clearly more of a peer than a server !

Next, we need a few instance variables. For instance, we need a JGroups channel (ch) which needs to
be configured with properties (props) defining the configuration of the protocol stack.
We also need a thread pool (thread_pool) to execute the tasks we receive to process. Here, we chose the
a simple pool which creates new threads when needed and removes threads that have been idle for
more than 60 seconds.

The 'tasks' hash map is the cache for received tasks. It is keyed by ClusterId and the values are Entry
instances (see below).

The view, rank and cluster_size variables are needed to determine whether or not to process a received
task. More on this later.

In start(), we create a JChannel based on the properties passed to server and connect it, which causes it
to join the cluster. Also, we set a Receiver, which means that we'll get receive(Message) and
viewAccepted(View) callbacks whenever a message or view change is received.
In stop(), we shut down the thread pool and close the channel, causing this node to leave the cluster
gracefully. Everybody else connected to this cluster will get a view change (viewAccepted() callback)
notifying them of the termination of this node.

Published at DZone with permission of its author, Bela Ban.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)