Bela has posted 1 posts at DZone. View Full User Profile

A Simple Clustered Task Distribution System

10.06.2008
| 54717 views |
  • submit to reddit

The receive() method handles all requests and responses. Upon reception of a message, we need to grab
its byte[] buffer, unmarshal it into a Request and then handle the request. We use the JGroups helper
method Util.streamableFromByteBuffer() to do this.

On reception of EXECUTE, we call handleExecute(), passing the ClusterID, the submitter and the task
to it.

On reception of RESULT (sent by a slave), we set the result in the promise, releasing the blocked
submitter of the task. Then, we multicast a REMOVE request.

On reception of REMOVE, we simply remove the task from our cache.
The handleExecute() method checks if a node should accept the task and, if yes, passes it to a thread
pool to execute:

private void handleExecute(ClusterID id, Address sender, Task task) {
tasks.putIfAbsent(id, new Entry(task, sender));
int index=id.getId() % cluster_size;
if(index != rank)
return;
thread_pool.execute(new Handler(id, sender, task);
}

First, we add the task to our tasks cache, keyed by the ClusterID5. Then we take the ClusterID's ID, modulo the number of nodes in the cluster. This is the rank of the node which should execute the task.
If it matches our own rank, we create a Handler and pass it to the thread pool to be executed on a separate thread, otherwise we return from handleExecute(). The Handler class is shown below:

private class Handler implements Runnable {
final ClusterID id;
final Address sender;
final Task task;
public void run() {
Object result=null;
try {
result=handle(task);
}
catch(Throwable t) {
result=t;
}
Request response=new Request(Request.Type.RESULT, null, id, result);
byte[] buf=Util.streamableToByteBuffer(response);
ch.send(new Message(sender, null, buf));
}
}

It executes the task against the Slave interface (handle() method) and stores the result. If there is an
exception, then the exception (which is serializable by default) is stored as result instead.

Then a Response object is created from the result. Util.streamableToByteBuffer() is called to generate a byte[] buffer from the response, which is then placed into a Message and sent via the channel to the
original submitter of the task.

Published at DZone with permission of its author, Bela Ban.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)