Entrepreneur. Creator of Groovy++ Alex is a DZone Zone Leader and has posted 31 posts at DZone. You can read more from them at their website. View Full User Profile

Lock free message passing algorithms with Groovy++

03.03.2010
| 12207 views |
  • submit to reddit

Last time we talked about implementation of functional queues with Groovy++ Today we will use these queues to implement several algorithms for processing of asynchronious messages. You can find source code and more examples in the Groovy++ distro

What we want to do is to implement simplified actor, the object which sequentially process asynchroniously coming messages. There are two types of actors

  • thread bound actor, which is the one having dedicated message processing thread. The thread is blocked if no messages are available
  • pooled actor, which is executed on some thread pool. The beauty of pooled actor is that it does not consume any resources at all if there is no messages to process

We will try to use the same approach based on our functional queues to implement both.

Let us start with interface definition for message channel

@Trait abstract class MessageChannel<T> {
abstract MessageChannel<T> post (T message)

MessageChannel<T> leftShift (T msg) {
post msg
}
}

@Trait annotation is Groovy++ way to define interface with default implementationof some methods. Each class implementing such interface will inherit default implementation if the method is not implemented by the class or superclass

We use Groovy++ trait here not because it is necessary for our sample but because the sample is real code from Groovy++ runtime.

Note that message channel has nothing to do with concurrency - we can implement method post in whatever way we like. Just for fun we can do following Multiplexor class, which immidiately redistribute all incoming messages to all subscribed channels.

class Multiplexor<M> implements MessageChannel<M> {
private volatile FList<MessageChannel<M>> listeners = FList.emptyList

Multiplexor<M> subscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l + channel))
return this
}
}

Multiplexor<M> subscribe(MessageChannel<M> ... channels) {
for (c in channels) {
subscribe(c)
}
this
}

Multiplexor<M> unsubscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l - channel))
return this
}
}

final Multiplexor<M> post(M message) {
listeners.each { channel ->
channel << message
}
this
}

static Multiplexor<M> of (MessageChannel<M> ... channels) {
new Multiplexor().subscribe(channels)
}
}

You may notice that it was almost trivial to allow subscribers to subscribe and unsubscribe asynchroniously using our functional lists

OK, back to our main story. Let us implement channel with asynchronious queue, which process no more than one message at any given moment.

Our idea is following:

  • we use functional queue to add messages
  • when we add message to the queue we signal subclass (whatever it means for subclassing algorithm)
  • we introduce special state of the queue to be used by subclasses, which means that the queue is already empty but last message is not processed yet. This is probably most non-trivial part of our algorithms

 Here is the implementation

abstract class QueuedChannel<M> implements MessageChannel<M> {

protected volatile FQueue<M> queue = FQueue.emptyQueue

protected static final FQueue busyEmptyQueue = FQueue.emptyQueue + null

MessageChannel<M> post(M message) {
for (;;) {
def oldQueue = queue
def newQueue = (oldQueue === busyEmptyQueue ? FQueue.emptyQueue : oldQueue) + message
if (queue.compareAndSet(oldQueue, newQueue)) {
signalPost(oldQueue, newQueue)
return this
}
}
}

protected abstract void signalPost (FQueue<M> oldQueue, FQueue<M> newQueue)

abstract void onMessage(M message)
}

Now we are ready to create our first real actor backed by Executor and scheduled for execution for each message. We call it "fair" because it does not try to take as much resources as possible but give chance to work for all it's collegues.

Here is explaination of the algorithm

  • our channel implements Runnable. That might be not perfect from OOP prospective bus save us additional object creation
  • when message added to empty queue we schedule actor for execution
  • if after processing of a message our queue still non-empty we schedule again
  • special care taken for the case when we process last message in the queue - we have to make sure that while we are not done new messages will not schedule new execution of the actor
abstract static class FairExecutingChannel<M> extends QueuedChannel<M> implements Runnable {
Executor executor

void run () {
for (;;) {
def q = queue
def removed = q.removeFirst()
if (q.size() == 1) {
if (queue.compareAndSet(q, busyEmptyQueue)) {
onMessage removed.first
if (!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
else {
if (queue.compareAndSet(q, removed.second)) {
onMessage removed.first
executor.execute this
break
}
}
}
}

protected void signalPost(FQueue<M> oldQueue, FQueue<M> newQueue) {
if (oldQueue !== busyEmptyQueue && newQueue.size() == 1)
executor.execute this
}
}

Fair algorithm above has one downside - if processing of messages is really fast we waste a lot of cycles by being executed for each and every message. That leads us to the idea of "non-fair" algorithm, which process all available messages when Runnable executed. For amounts of small messages it runs 2-3 times faster.

Here is the implementation, which is even simplier

@Typed abstract class NonfairExecutingChannel<M> extends FairExecutingChannel<M>  {
void run () {
for (;;) {
def q = queue
if (queue.compareAndSet(q, busyEmptyQueue)) {
for(m in q) {
if (m)
onMessage m
}
if(!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
}
}

Intersting to notice that we can develop some variations of algorithms above. For example we can process as many messages as we can in given timeframe (let say 250ms) or given number of messages in a run. Functional data structures gives us a lot of flexibility.

To have the picture complete we should also implement thread backed variation of our approach. We leave it as exercise for reader

Thank you for reading and hope it was interesting. Till next time. 

Published at DZone with permission of its author, Alex Tkachman.

Comments

Osvaldo Doederlein replied on Wed, 2010/03/03 - 8:45am

Just to be picky, a lock-free algorithm is not very advantageous if it relies a lot (on all steps/operations) on methods like compareAndSet(), which requires a CAS or similar instruction in hardware (forces a potentially very expensive cache sync). Good implementations of locks (like in any modern JVM) will have basically the same overhead and scalability in the common, uncontended case; and that case will happen virtually 100% of the time if your algorithm reduces the critical sections to absolute minimal work, like updating a couple variables or adding/removing a single element from a queue (fast enough that you are basically guaranteed to finish before another thread wants to do the same, even with multicore/SMP).

Good lock-free algorithms are those that don't rely on CAS (or similar) at all, or do it only on uncommon cases, e.g. see Cliff Click's concurrent hashmap (it needs a single CAS for updates, and none at all for reads - the reads are race-tolerant, the hashmap structures are carefully designed so all possible write/read races are handled sensibly by the reader - so the scalability is awesome when there's vastly more readers than writers). If you not only need CAS in all operations, but you need multiple CAS per operation, your scalability may very easily be worse than a simpler lock-based algorithm (of course this requires careful testing to assert).

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.