Performance Zone is brought to you in partnership with:

Jonas spends most of his time developing large scale financial systems as well as lecturing and speaking at developer conferences world-wide. He has worked at Terracotta, the JRockit JVM at BEA and is an active contributor to the Open Source community; most notably created the AspectWerkz aspect-oriented programming framework, committer to the Terracotta JVM clustering technology and been part of the Eclipse AspectJ team. Jonas has posted 3 posts at DZone. View Full User Profile

Introducing Akka – Simpler Scalability, Fault-Tolerance, Concurrency & Remoting Through Actors

01.05.2010
| 26236 views |
  • submit to reddit

Writing correct concurrent, fault-tolerant and scalable applications is too hard. Most of the time it’s because we are using the wrong tools and the wrong level of abstraction.

Akka is an attempt to change that.

Akka uses the Actor Model together with Software Transactional Memory to raise the abstraction level and provide a better platform to build correct concurrent and scalable applications.

For fault-tolerance Akka adopts the “Let it crash”, also called “Embrace failure”, model which have been used with great success in the telecom industry to build applications that self-heals, systems that never stop.

Actors also provides the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Akka is Open Source and available under the Apache 2 License.

In this article we will introduce you to Akka and see how we can utilize it to build a highly concurrent, scalable and fault-tolerant network server.

But first let’s take a step back and discuss what Actors really are and what they are useful for.

Actors

The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management. It makes it easier to write correct concurrent and parallel systems. Actors are really nothing new, they were defined in the 1963 paper by Carl Hewitt and have been popularized by the Erlang language which emerged in the mid 80s. It has been used by for example at Ericsson with great success to build highly concurrent and extremely reliable (99.9999999 % availability – 31 ms/year downtime) telecom systems.

Actors encapsulates state and behavior into a lightweight process/thread. In a sense they are like OO objects but with a major semantic difference; they do not share state with any other Actor. Each Actor have their own view of the world and can only have impact on other Actors by sending messages to them. Messages are sent asynchronously and non-blocking in a so-called “fire-and-forget” manner where the Actor sends off a message to some other Actor and then do not wait for a reply but goes off doing other things or are suspended by the runtime. Each Actor has a mailbox (ordered message queue) in which incoming messages are processed one by one. Since all processing is done asynchronously and Actors do not block and consume any resources while waiting for messages, Actors tend to give very good concurrency and scalability characteristics and are excellent for building event-based systems.

Creating Actors

Akka has both a Scala API and a Java API. In this article we will only look at the Scala API since that is the most expressive one. The article assumes some basic Scala knowledge, but even if you don’t know Scala I don’t think it will not be too hard to follow along anyway.

Akka has adopted the same style of writing Actors as Erlang in which each Actor has an explicit message handler which does pattern matching to match on the incoming messages.

Actors can be created either by:

  • Extending the ‘Actor’ class and implementing the ‘receive’ method.
  • Create an anonymous Actor using one of the ‘actor’ methods.

Here is a little example before we dive into a more interesting one.

class MyActor extends Actor {
def receive = {
case "test" => println("received test")
case _ => println("received unknown message")
}
}

val myActor = new MyActor
myActor.start

Here is the same Actor with the anonymous syntax. Anonymous Actors are implicitly started:

val myActor = actor { 
case "test" => println("received test")
case _ => println("received unknown message")
}

Akka Actors are extremely lightweight. Each Actor consume ~600 bytes, which means that you can create 6.5 million on 4 G RAM.

Messages are sent using the ‘!’ operator:

myActor ! "test"

Sample application

We will try to write a simple chat/IM system. It is client-server based and uses remote Actors to implement remote clients. Even if it is not likely that you will ever write a chat system I think that it can be a useful exercise since it uses patterns and idioms found in many other use-cases and domains.

We will use many of the features of Akka along the way. In particular; Actors, fault-tolerance using Actor supervision, remote Actors, Software Transactional Memory (STM) and persistence.

But let’s start by defining the messages that will flow in our system.

Creating messages

It is very important that all messages that will be sent around in the system are immutable. The Actor model relies on the simple fact that no state is shared between Actors and the only way to guarantee that is to make sure we don’t pass mutable state around as part of the messages.

In Scala we have something called case classes. These make excellent messages since they are both immutable and great to pattern match on.

Let’s now start by creating the messages that will flow in our system.

/**
* ChatServer's internal events.
*/
sealed trait Event

case class Login(username: String) extends Event
case class Logout(username: String) extends Event

case class ChatMessage(fromUser: String, message: String) extends Event

case class GetChatLog(fromUser: String) extends Event
case class ChatLog(messages: List[String]) extends Event

As you can see with these messages we can log in and out, send a chat message and ask for and get a reply with all the messages in the chat log so far.

Client: Sending messages

Our client wraps each message send in a function, making it a bit easier to use. Here we assume that we have a reference to the chat service so we can communicate with it by sending messages. Messages are sent with the ‘!’ operator (pronounced “bang”). This sends a message of asynchronously and do not wait for a reply.

Sometimes however, there is a need for sequential logic, sending a message and wait for the reply before doing anything else. In Akka we can achieve that using the ‘!!’ (“bangbang”) operator. When sending a message with ‘!!’ we do not return immediately but wait for a reply using a Future. A ‘Future’ is a promise that we will get a result later but with the difference from regular method dispatch that the OS thread we are running on is put to sleep while waiting and that we can set a time-out for how long we wait before bailing out, retrying or doing something else. The ‘!!’ function returns a scala.Option which implements the Null Object pattern. It has two subclasses; ‘None’ which means no result and ‘Some(value)’ which means that we got a reply. The ‘Option’ class has a lot of great methods to work with the case of not getting a defined result. F.e. as you can see below we are using the ‘getOrElse’ method which will try to return the result and if there is no result defined invoke the “…OrElse” statement.

/**
* Chat client.
*/
class ChatClient(val name: String) {
import Actor.Sender.Self
def login = ChatService ! Login(name)
def logout = ChatService ! Logout(name)
def post(message: String) = ChatService ! ChatMessage(name, name + ": " + message)
def chatLog: ChatLog = {
val option = ChatService !! (GetChatLog(name), 1000) // timeout 1000 ms
option.getOrElse(throw new Exception("Couldn't get the chat log"))
}
}

Session: Receiving messages

Now we are done with the client side and let’s dig into the server code. We start by creating a user session. The session is an Actor and is defined by extending the ‘Actor’ trait. This trait has one abstract method that we have to define; ‘receive’ which implements the message handler for the Actor.

In our example the session has state in the form of a ‘List’ with all the messages sent by the user during the session. In takes two parameters in its constructor; the user name and a reference to an Actor implementing the persistent message storage. For both of the messages it responds to, ‘ChatMessage’ and ‘GetChatLog’, it passes them on to the storage Actor.

If you look closely (in the code below) you will see that when passing on the ‘GetChatLog’ message we are not using ‘!’ but ‘forward’. This is similar to ‘!’ but with the important difference that it passes the original sender reference, in this case to the storage Actor. This means that the storage can use this reference to reply to the original sender (our client) directly.

/**
* Internal chat client session.
*/
class Session(user: String, storage: Actor) extends Actor {
private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil

log.info("New session for user [%s] has been created at [%s]", user, loginTime)

def receive = {
case event: ChatMessage =>
userLog ::= event.message
storage ! event

case event: GetChatLog =>
storage forward event
}
}

Let it crash: Implementing fault-tolerance

Akka’s approach to fault-tolerance; the “let it crash” model, is implemented by linking Actors. It is very different to what Java and most non-concurrency oriented languages/frameworks have adopted. It’s a way of dealing with failure that is designed for concurrent and distributed systems.

If we look at concurrency first. Now let’s assume we are using non-linked Actors. Throwing an exception in concurrent code, will just simply blow up the thread that currently executes the Actor. There is no way to find out that things went wrong (apart from see the stack trace in the log). There is nothing you can do about it. Here linked Actors provide a clean way of both getting notification of the error so you know what happened, as well as the Actor that crashed, so you can do something about it.

Linking Actors allow you to create sets of Actors where you can be sure that either:

  • All are dead
  • All are alive

This is very useful when you have hundreds of thousands of concurrent Actors. Some Actors might have implicit dependencies and together implement a service, computation, user session etc. for these being able to group them is very nice.

Akka encourages non-defensive programming. Don’t try to prevent things from go wrong, because they will, whether you want it or not. Instead; expect failure as a natural state in the life-cycle of your app, crash early and let someone else (that sees the whole picture), deal with it.

Now let’s look at distributed Actors. As you probably know, you can’t build a fault-tolerant system with just one single node, but you need at least two. Also, you (usually) need to know if one node is down and/or the service you are talking to on the other node is down. Here Actor supervision/linking is a critical tool for not only monitoring the health of remote services, but to actually manage the service, do something about the problem if the Actor or node is down. This could be restarting him on the same node or on another node.

To sum things up, it is a very different way of thinking but a way that is very useful (if not critical) to building fault-tolerant highly concurrent and distributed applications.

Supervisor hierarchies

A supervisor is a regular Actor that is responsible for starting, stopping and monitoring its child Actors. The basic idea of a supervisor is that it should keep its child Actors alive by restarting them when necessary. This makes for a completely different view on how to write fault-tolerant servers. Instead of trying all things possible to prevent an error from happening, this approach embraces failure. It shifts the view to look at errors as something natural and something that will happen and instead of trying to prevent it; embrace it. Just “let it crash” and reset the service to a stable state through restart.

Akka has two different restart strategies; All-For-One and One-For-One.

  • OneForOne: Restart only the component that has crashed.
  • AllForOne: Restart all the components that the supervisor is managing, including the one that have crashed.

The latter strategy should be used when you have a certain set of components that are coupled in some way that if one is crashing they all need to be reset to a stable state before continuing.

Chat server: Supervision, Traits and more

There are two ways you can define an Actor to be a supervisor; declaratively and dynamically. In this example we use the dynamic approach. There are two things we have to do:

  • Define the fault handler by setting the ‘faultHandler’ member field to the strategy we want.
  • Define the exceptions we want to “trap”, e.g. which exceptions should be handled according to the fault handling strategy we have defined. This in done by setting the ‘trapExit’ member field to a ‘List’ with all exceptions we want to trap.

The last thing we have to do to supervise Actors (in our example the storage Actor) is to ‘link’ the Actor. Invoking ‘link(actor)’ will create a link between the Actor passed as argument into ‘link’ and ourselves. This means that we will now get a notification if the linked Actor is crashing and if the cause of the crash, the exception, matches one of the exceptions in our ‘trapExit’ list then the crashed Actor is restarted according the the fault handling strategy defined in our ‘faultHandler’. We also have the ‘unlink(actor)’ function which disconnects the linked Actor from the supervisor.

In our example we are using a method called ‘startLink(actor)’ which starts the Actor and links him in an atomic operation. The linking and unlinking is done in ‘init’ and ‘shutdown’ callback methods which are invoked by the runtime when the Actor is started and shut down (shutting down is done by invoking ‘actor.stop’). In these methods we initialize our Actor, by starting and linking the storage Actor and clean up after ourselves by shutting down all the user session Actors and the storage Actor.

That is it. Now we have implemented the supervising part of the fault-tolerance for the storage Actor. But before we dive into the ‘ChatServer’ code there are some more things worth mentioning about its implementation.

It defines an abstract member field holding the ‘ChatStorage’ implementation the server wants to use. We do not define that in the ‘ChatServer’ directly since we want to decouple it from the actual storage implementation.

The ‘ChatServer’ is a ‘trait’, which is Scala’s version of mixins. A mixin can be seen as an interface with an implementation and is a very powerful tool in Object-Oriented design that makes it possible to design the system into small, reusable, highly cohesive, loosely coupled parts that can be composed into larger object and components structures.

I’ll try to show you how we can make use Scala’s mixins to decouple the Actor implementation from the business logic of managing the user sessions, routing the chat messages and storing them in the persistent storage. Each of these separate parts of the server logic will be represented by its own trait; giving us four different isolated mixins; ‘Actor’, ‘SessionManagement’, ‘ChatManagement’ and ‘ChatStorageFactory’ This will give us as loosely coupled system with high cohesion and reusability. At the end of the article I’ll show you how you can compose these mixins into a the complete runtime component we like.

/**
* Chat server. Manages sessions and redirects all
* other messages to the Session for the client.
*/
trait ChatServer extends Actor {
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Exception])

val storage: ChatStorage

log.info("Chat service is starting up...")

// actor message handler
def receive = sessionManagement orElse chatManagement

// abstract methods to be defined somewhere else
protected def chatManagement: PartialFunction[Any, Unit]
protected def sessionManagement: PartialFunction[Any, Unit]
protected def shutdownSessions: Unit

override def init = startLink(storage)

override def shutdown = {
log.info("Chat server is shutting down...")
shutdownSessions
unlink(storage)
storage.stop
}
}

If you look at the ‘receive’ message handler function you can see that we have defined it but instead of adding our logic there we are delegating to two different functions; ‘sessionManagement’ and ‘chatManagement’, chaining them with ‘orElse’. These two functions are defined as abstract in our ‘ChatServer’ which means that they have to be provided by some another mixin or class when we instantiate our ‘ChatServer’. Naturally we will put the ‘sessionManagement’ implementation in the ‘SessionManagement’ trait and the ‘chatManagement’ implementation in the ‘ChatManagement’ trait. First let’s create the ‘SessionManagement’ trait.

Chaining partial functions like this is a great way of composing functionality in Actors. You can for example put define one default message handle handling generic messages in the base Actor and then let deriving Actors extend that functionality by defining additional message handlers. There is a section on how that is done here.

Published at DZone with permission of its author, Jonas Boner.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Banq Ee replied on Fri, 2010/01/08 - 2:35am

Transactors is simpler than JTA in Java? its usage is like transaction in java. in java we can implements Actors model with asynchronously domain events + jms , don't believe? refer a opensource java DDD framework ppt: http://www.slideshare.net/banq/jdonframework-2734177

Unknown Notknown replied on Wed, 2010/01/20 - 2:25am

Good one.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.