Enterprise Integration Zone is brought to you in partnership with:

I am the founder and lead developer of Hibernate Envers, a Hibernate core module, which provides entity versioning/auditing capabilities. I am also one of the co-founders of SoftwareMill, a company specializing in delivering customized software solutions (http://softwaremill.com, "Extraordinary software as a standard"), based on Java and JBoss technologies. After work, apart from being involved in development of Envers, I work on several small open source projects, like ElasticMQ (simple message queue written in Scala with an SQS interface), projects around static analysis (using JSR 308 - Typestate Annotations/ Checkers Framework and FindBugs), and some CDI/Weld (not always portable) extensions, like autofactories or stackable security interceptors. I am also interested in new JVM-based languages, especially with functional elements (like Scala, JRuby) and frameworks built using them (like Lift), as well as improving the ways we use Dependency Injection. Adam is a DZone MVB and is not an employee of DZone and has posted 52 posts at DZone. You can read more from them at their website. View Full User Profile

ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray

06.13.2013
| 3652 views |
  • submit to reddit

ElasticMQ 0.7.0, a message queueing system with an actor-based Scala and Amazon SQS-compatible interfaces, was just released.

It is a major rewrite, using Akka actors at the core and Spray for the REST layer. So far only the core and SQS modules have been rewritten; journaling, SQL backend and replication are yet to be done.

The major client-side improvements are:

  • long polling support, which was added to SQS some time ago
  • simpler stand-alone server – just a single jar to download

With long polling, when receiving a message, you can specify an additional MessageWaitTime attribute. If there are no messages in the queue, instead of completing the request with an empty response, ElasticMQ will wait up to MessageWaitTime seconds until messages arrive. This helps both to reduce the bandwidth used (no need for very frequent requests), improve overall system performance (messages are received immediately after being sent) and to reduce SQS costs.

The stand-alone server is now a single jar. To run a local, in-memory SQS implementation (e.g. for testing an application which uses SQS), all you need to do is download the jar file and run:

java -jar elasticmq-server-0.7.0.jar

This will start a server on http://localhost:9324. Of course the interface and port are configurable, see the README for details. As before, you can also run an embedded server from any JVM-based language.

Implementation notes

For the curious, here’s a short description of how ElasticMQ is implemented, including the core system, REST layer, Akka Dataflow usage and long polling implementation. All the code is available on GitHub.

As already mentioned, ElasticMQ is now implemented using Akka and Spray, and doesn’t contain any blocking calls. Everything is asynchronous.

Core

The core system is actor-based. There’s one main actor (QueueManagerActor), which knows what queues are currently created in the system, and gives the possibility to create and delete queues.

For communication with the actors, the typed ask pattern is used. For example, to lookup a queue (a queue is also an actor), a message is defined:

case class LookupQueue(queueName: String) extends Replyable[Option[ActorRef]]

Usage looks like this:

import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")

As already mentioned, each queue is an actor, and encapsulates the queue state. We can use simple mutable data structures, without any need for thread synchronisation, as the actor model takes care of that for us. There’s a number of messages which can be sent to a queue-actor, e.g.:

case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
Rest layer

The SQS query/REST layer is implemented using Spray, a lightweight REST/HTTP toolkit based on Akka.

Apart from a non-blocking, actor-based IO implementation, Spray also offers a powerful routing library, spray-routing. It contains a number of built-in directives, for matching on the request method (get/post etc.), extracting query of form parameters or matching on the request path. But it also lets you define your own directives, using simple directive composition. A typical ElasticMQ route looks like this:

val listQueuesDirective = 
  action("ListQueues") {
    rootPath {
      anyParam("QueueNamePrefix"?) { prefixOption =>
        // logic
      }
    }
  }

Where action matches on the action name specified in the "Action" URL of body parameter and accepts/rejects the request, rootPath matches on an empty path and so on. Spray has a good tutorial, so I encourage you to take a look there, if you are interested.

How to use the queue actors from the routes to complete HTTP requests?

The nice thing about Spray is that all it does is passing a RequestContext instance to your routes, expecting nothing in return. It is up to the route to discard the request completely or complete it with a value. The request may also be completed in another thread – or, for example, when some future is completed. Which is exactly what ElasticMQ does. Here map, flatMap and for-comprehensions (which are a nicer syntax for map/flatMap) are very handy, e.g. (simplified):

// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

Sometimes, when the flow is more complex, ElasticMQ uses Akka Dataflow, which requires the continuations plugin to be enabled. There’s also a similar project which uses macros, Scala Async, but it’s in early development.

Using Akka Dataflow, you can write code which uses Futures as if it was normal sequential code. The CPS plugin will transform it to use callbacks where needed. An example, taken from CreateQueueDirectives:

flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

The important parts here are the flow block, which delimits the scope of the transformation, and the apply() calls on Futures which extract the content of the future. This looks like completely normal, sequential code, but when executed, since the first Future usage will be run asynchronously.

Long polling

With all of the code being asynchronous and non-blocking, implementing long polling was quite easy. Note that when receiving messages from a queue, we get a Future[List[MessageData]]. In response to completing this future, the HTTP request is also completed with the appropriate response. However this future may be completed almost immediately (as is the case normally), or after e.g. 10 seconds – there’s no changes in code needed to support that. So the only thing to do was to delay completing the future until the specified amount of time passed or new messages have arrived.

The implementation is in QueueActorWaitForMessagesOps. When a request to receive messages arrives, and there’s nothing in the queue, instead of replying (that is, sending an empty list to the sender actor) immediately, we store the reference to the original request and the sender actor in a map. Using the Akka scheduler, we also schedule sending back an empty list and removal of the entry after the specified timeout.

When new messages arrive, we simply take a waiting request from the map and try to complete it. Again, all synchronisation and concurrency problems are handled by Akka and the actor model.

Please test the new release, and let me know if you have any feedback!

Adam





Published at DZone with permission of Adam Warski, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)