Bela has posted 1 posts at DZone. View Full User Profile

A Simple Clustered Task Distribution System

10.06.2008
| 54117 views |
  • submit to reddit
API

The main API is org.jgroups.JChannel:

public class JChannel extends Channel {
public JChannel(String properties) throws ChannelException;
public void setReceiver(Receiver r);
public void connect(String cluster_name) throws ChannelException;
public void send(Message msg) throws ChannelException;
public View getView();
public Address getLocalAddress();
public void disconnect();
public void close();
}

We join a cluster by creating a channel and calling connect():

Channel ch=new JChannel(“/home/bela/udp.xml”);
ch.setReceiver(new ReceiverAdapter() {
public void receive(Message msg) {}
});
ch.connect(“demo-cluster”);

This creates a channel with a protocol stack defined in /home/bela/udp.xml. If an application needs
different properties, it would modify udp.xml and pass the modified XML file to the constructor.
Then we set a Receiver, which has callbacks that are invoked when messages are received. Finally, we
join cluster “demo-cluster”. All channels with the same configuration and the same cluster name
(argument of connect()) will join the same cluster.

A node can send and receive messages once it has joined a cluster.The Receiver interface has 2 methods we're interested in:

void receive(Message msg);
void viewAccepted(View new_view);

The receive() callback is invoked whenever a message is received (Note that receive() can be invoked concurrently, when we receive messages from different senders it therefore needs to be reentrant). Its argument is an org.jgroups.Message:

public class Message implements Streamable {
protected Address dest_addr=null;
protected Address src_addr=null;
private byte[] buf=null;
public byte[] getBuffer();
public void setBuffer(byte[] b);
}

A message has the address of the destination (dest_addr) and sender (src_addr) and a payload (buf). An
Address is an opaque class identifying a node uniquely within a cluster2. A destination address of null
means that the message is to be sent to all cluster nodes (a multicast), a non-null destination address
means to send the message to a single receiver (a unicast).
When receiving a message, the application can call getBuffer() to retrieve the byte[] buffer and then
unmarshal it into data meaningful to the application.

The viewAccepted() callback is invoked when a node joins or leaves. Its only parameter is a View,
which is essentially a list of addresses. Views are received by all cluster nodes in exactly the same
order, so when we have a cluster {A,B,C}, then the nodes have the following views:
● A: {A,B,C}
● B: {A,B,C}
● C: {A,B,C}
If we had a new node D joining, then the view would become {A,B,C,D}. If B crashed, then everyone
would install {A,C,D}. As we can see, the nodes in a view are ordered according to join time.
The oldest node is always first.
The current view can also be retrieved from a channel by calling Channel.getView().

Published at DZone with permission of its author, Bela Ban.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)