A Simple Clustered Task Distribution System
Let's now discuss the remaining methods of JChannel.
The send() method takes a Message and sends it to all cluster nodes if the message's destination is null,
or to a single node if the destination is non-null. The application needs to marshal their data to a byte
buffer and set place it into the message via Message.setBuffer().
To retrieve the current view, we can use Channel.getView() and to fetch the local address of a node, we
call Channel.getLocalAddress(). The disconnect() method leaves a cluster and close() destroys the channel. A closed channel cannot be opened again. Invoking close() also disconnects a channel if not already disconnected.
Calling disconnect() will install a new view in all cluster nodes, and viewAccepted() will be invoked in
As we will see in building our task distribution system, the use of JGroups allows us to focus on the
system at hand, and not have to worry about clustering issues, as JGroups does the heavy lifting. The
main features used in our system are membership management (we need to know who the cluster nodes
are, and when a node joins or leaves) and reliable messaging (for task sending). Plus, by modifying the
protocol configuration, we can adapt the transport to our needs.
Task distribution system
The idea is very simple: we have a cluster of nodes, and each node can submit tasks to be executed by
some other node in the cluster. So every node is a peer, in the sense that it can both submit and handle
tasks. In a real life application, clients would connect to any of the nodes, e.g. via TCP or RMI, and
submit tasks to that node, which would then distribute it to some other node (or handle it itself).
When submitting a task, we choose a random integer which is then mapped to the rank of a node in the
cluster (int mod N where N is the cluster size). The rank is the position of a node in the view, and since
the view is the same in all nodes, the rank identifies the node uniquely.
The task is then multicast (EXECUTE) across the cluster. Every node adds the task to a hash map
consisting of tasks and their submitters' (JGroups) addresses.
Every node now compares the rank shipped with the task to its own rank. It it doesn't match, nothing is
done. If it matches, the node needs to process the task. It does so and returns the result to the submitter.
When the submitter receives the response (RESULT), it multicasts a REMOVE message across the
cluster. Upon reception of REMOVE(T), every node removes T from its hash map
If a node X crashes (or leaves gracefully), we know which tasks were assigned to it by looking up the
tasks in the hash map, keyed by X. All tasks which are still present in the hash map have not yet been
processed and need to be re-executed, this time by a different node. This is done by comparing the rank
shipped with the task to the node's rank and executing it if a node's own rank matches it.
If a master M crashes after having submitted a few tasks but not yet having received the results, the
slaves remove all tasks submitted by M, because M won't need the results anymore3.
Fig. 2 shows how task distribution in a cluster looks like.
The cluster consists of nodes A, B, C and D. Clients can access any one of them. A task submitted for
example to B by a client might assign 23 to the task. B then multicasts an EXECUTE(23, TASK)
message to all nodes in the cluster, and every node adds task #23 to its cache.
However, the only node processing task #23 is A (to which 23 happens to map to), which then sends the
result as a RESULT(23, OBJ) to B. B returns the result OBJ to the client and multicasts a
REMOVE(23) message to the cluster, which causes all nodes to remove task #23 from their caches.
Had A crashed during processing of task #23, some other node would have taken over, processed the
result and sent it back to B.
(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)