Bela has posted 1 posts at DZone. View Full User Profile

A Simple Clustered Task Distribution System

  • submit to reddit

Our code is now almost complete. The only thing missing is the handling of membership changes.
Remember that we need to resubmit tasks from failed nodes, or from nodes who left gracefully, to other
nodes. This is done in viewAccepted(View):
public void viewAccepted(View view) {
List<Address> left_members=Util.leftMembers(this.view, view);
Address local_addr=ch.getLocalAddress();
Vector<Address> mbrs=view.getMembers();
for(int i=0; i < mbrs.size(); i++) {
Address tmp=mbrs.get(i);
if(tmp.equals(local_addr)) {
if(left_members != null && !left_members.isEmpty()) {
for(Address mbr: left_members)


First, we determine which members left between the new and previous views. This is done with Util.leftMembers(), which returns a list of addresses of nodes which left between 2 views. Then we set the local address (Channel.getLocalAddress()), cluster_size and view.

Next, the rank is computed by iterating through the new membership and comparing each element against the local address. On a match, our rank is the iteration counter. For example, if we have a membership of {A,B,C,D,E}, and we're C, then our rank will be 2.

Finally, we need to determine whether any nodes left since the previous view, and whether there are any tasks to take over from them. This is done by iterating through all left members (if there are any) and calling handleLeftMember(), which is shown below:

private void handleLeftMember(Address mbr) {
for(Map.Entry<ClusterID,Entry> entry: tasks.entrySet()) {
ClusterID id=entry.getKey();
int index=id.getId() % cluster_size;
if(index != rank)
Entry val=entry.getValue();
if(mbr.equals(val.submitter)) {
execute(id, val.submitter, val.task);

This method iterates through all cache entries and compares the ID (modulo cluster size) to our own
rank. If it matches, we execute the task (unless the submitter itself left, in which case we drop the

Note that both rank and cluster_size might change on receiving a new view. Both assignments need to
happen before handleLeftMember() is called, as this method uses the 2 variables. For example, if we
have cluster {A,B,C,D,E}, then C has rank=2 and D has rank=3. If C crashes, D's and E's ranks change:
D's rank is now 2. This means that D will process all of the tasks that C was processing and which
hadn't completed by the time C crashed (otherwise C would have removed them).

The code above has a deficiency though: assuming we have a cluster of {A,B,C,D} and A crashes, then
the ranks of B, C and D change: B from 1 to 0, C from 2 to 1 and D from 3 to 2.

This means that B, C and D will now execute tasks which were already being worked on by other
nodes. For example, C will re-execute D's tasks and B will re-execute C's tasks. This is not incorrect, as
the submitter of a task will remove the task when completed. So, when receiving a result R from a
slave for a task which was already completed and therefore removed, the submitter just drops R.
This is not wrong, but leads to spurious and unneeded processing. A better way to define the rank
would be to use consistent hashing ([2]), which minimizes changes to the rank and therefore reexecution
of tasks already being worked on by other nodes.

Our code is now complete. The last thing to do is to write the driver code, which we also add to Server:

public static void main(String[] args) throws Exception {
Server server=new Server(props);
private static void loop(Server server) {
boolean looping=true;
while(looping) {
int key=Util.keyPress("[1] Submit [2] Submit long running task [q] Quit");
switch(key) {
case '1':
Task task=new Task() {
public Object execute() {
return new Date();
log("<== result = " + server.submit(task, 30000));
case '2':
task=new Task() {
public Object execute() {
return new Date();
log("<== result = " + server.submit(task, 30000));


The main() method creates a Server and starts it. The loop() method waits for a key press and then
submits a short running (on '1') or long running (on '2') task. The task simply returns a new Date with
the current time. The long running task sleep for 15 seconds before returning the date. When 'q' is
pressed, we stop the server gracefully and return.

Published at DZone with permission of its author, Bela Ban.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)