Entrepreneur. Creator of Groovy++ Alex is a DZone Zone Leader and has posted 31 posts at DZone. You can read more from them at their website. View Full User Profile

Cluster discovery with Groovy++

03.19.2010
| 6973 views |
  • submit to reddit

First networking application I wrote in my life (OMG, it was 17 years ago) was very interesting. We were developing marine navigation system - electronica chart + connections to GPS, radars and other sensors. At some point it became obvious that one computer is not enough at least by reasons of reservation. The funny fact is that main reason from business prospective was even not that but the fact that on really large vessel (tanker or cargo or whatever) the size of bridge is so big that you want to have at least three computers (collaborative of course) - one in the centrum of the bridge and one on each wing.

One of the main requirements was to have zero network configuration. As you can imagine typical members of crew can do nothing with IP addresses or ports. So computers had to find each other silently and decide how to cooperate.

The solution is well known - each computer(node) periodically broadcast over network own ID and network address where it listens for incoming connections. Also every node listens for such broadcasted messages. When another node receives such message it decides who should connect whom. It might be deterministic algorithm (like comparing IDs) or who will be the first to estaiblish connection or whatever. As soon as nodes discovered each othercollaboration is possible.

Frankly speaking at that moment it was hard task for me which took quite some time. Today I want to show how easy it is.

Our plan is following: we will develop two universal objects - one for periodical broadcasting of messages and another one for receiving.

Every object will have dedicated thread and dedicated socket. In many situation use of two sockets is not necessary because usually you have both broadcaster and sender together but in some cases you need only one of them, so we will implement more general solution

Our sender/receivers will be very generic and knows nothing about nature of the data. We want to separate concerns.

Let us start with common functionality for sender and receiver.

abstract class BroadcastThread extends SupervisedChannel {
InetAddress group
int port

private MulticastSocket socket
private volatile boolean stopped

abstract void loopAction ()

protected void doStartup() {
executor.execute {
try {
socket = new MulticastSocket(port);
socket.joinGroup(group);
while (!stopped)
loopAction ()
}
catch(Throwable t) {
stopped = true
crash(t)
}
socket.close()
}
}

protected void doShutdown() {
stopped = true;
}
}

SuperviseChannel is very interesting animal. It worth separate article by itself but here is brief idea

First of all it is message channel (or actor) in the sense described in the article "Lock free message passing algorithms with Groovy++" It means that it reacts to incoming messages and we garantee that no more than one mesage is processed ar any given moment. Particularly startup()/shutdown() methods are asynchronius (just sending respective message). Methods doStartup()/doShutdown() called when message received and define what our channel should do on startup.

Secondly, and even more important supervised channels built in to tree. This great idea came from Erlang OTP. Roughly speaking, every supervised channel responsibly for creation, starting and stopping own childs. Especially important that it also responsible for decision what to do if some child has crashed. There are many strategies possible - restart crashed child, or stop and restart all childs, or crash itself and let own supervisor to decide what to do (our default policy)

We don't have any childs for sender/receiver but very soon we will combine them in to more intersting object. What we do hav.e is that if some error happend and we crashed we let our supervisor to decide what to do

The last thing to notice is that every supervised channel has java.util.concurrent.Executor (usually inherited from the owner/supervisor) We use it in the code above to start new thread

For now we are ready to create sender

    static class Sender extends BroadcastThread {
long sleepPeriod = 1000L
byte [] dataToTransmit

void loopAction () {
socket.send ([dataToTransmit, dataToTransmit.length, group, port])
Thread.currentThread().sleep(sleepPeriod);
}
}

Isn't it very simple? We just need to define frequency of transmission and data to be broadcasted

Receiver is only a little bit more complicated. We want do be able to define transformation of received bytes in to something meaningful to be sent either to owner or if we have no owner to our own processing.

    static class Receiver extends BroadcastThread {
Function1<byte[],?> messageTransform

void loopAction () {
def buffer = new byte [512]
def packet = new DatagramPacket(buffer, buffer.length)
socket.receive(packet)

def msg = buffer
if (messageTransform)
msg = messageTransform(buffer)
if (msg)
(owner ? owner : this).post(msg)
}
}

No we are ready to build our main discovery object. It will be supervisor combining sender and receiver.

When started it creates and starts both sender and receiver. Both will be stopped automatically when it stopped

Received bytes are transformed in to meaningful messages and send to any interested listener(including ourself, which opens possibility to subclass and have bridge between message passing and OOP world)

class BroadcastDiscovery extends SupervisedChannel {
// our ID
UUID uid
// our IP address
InetSocketAddress address
// where to send notifications on discovery
Multiplexor<Discovery> listeners = []


// where to broadcast
InetAddress group
// broadcast port
int port

static class Discovery {
UUID uuid
SocketAddress address
}

void doStartup() {
listeners.subscribe(this)
BroadcastThread.Sender sender = [
group:group,
port:port,
dataToTransmit: createDataToTransmit()
]
startupChild (sender)

BroadcastThread.Receiver receiver = [
group:group,
port:port,
messageTransform: { byte [] buf -> listeners.post(transformReceivedData(buf)) }
]
startupChild (receiver)
}

protected void doOnMessage(Object message) {
switch(message) {
case Discovery:
Discovery dicovery = message
onDiscovery(dicovery.uuid, dicovery.address)
break

default:
super.doOnMessage(message)
}
}

protected void onDiscovery(final UUID uuid, final SocketAddress address) {
}

private byte [] createDataToTransmit() {
// ...
}

private Discovery transformReceivedData (byte [] buf ) {
// ...
}
}

Two details to note:

Our message transform always returns null, which means that receiver will not send a message. Instead we send transformed data to all subscribed listeners from inside of transformation

Method startupChild used above is standard way to simulatniously create owner/child relation and startup the child

We are almost done. Just for completeness here is code to start our discovery channel

    private void startBroadcast() {
broadcast = new Broadcast()
broadcast.group = GROUP
broadcast.port = PORT
broadcast.uid = clusterNode.id
broadcast.address = (InetSocketAddress)serverChannel.getLocalAddress()
broadcast.startup ()
}

Very last but important note:

Usually such discovery channel will be not root supervisor. For example it is naturally to imagine that "brother" of discovery channel is channel responsible for accepting incoming TCP/IP connections and so forth

Thank you for your time. Hope it was interesting.

Till next time.

Published at DZone with permission of its author, Alex Tkachman.