Performance Zone is brought to you in partnership with:

Jonas spends most of his time developing large scale financial systems as well as lecturing and speaking at developer conferences world-wide. He has worked at Terracotta, the JRockit JVM at BEA and is an active contributor to the Open Source community; most notably created the AspectWerkz aspect-oriented programming framework, committer to the Terracotta JVM clustering technology and been part of the Eclipse AspectJ team. Jonas has posted 3 posts at DZone. View Full User Profile

Introducing Akka – Simpler Scalability, Fault-Tolerance, Concurrency & Remoting Through Actors

01.05.2010
| 26247 views |
  • submit to reddit

Session management

The session management is defined in the ‘SessionManagement’ trait in which we implement the two abstract methods in the ‘ChatServer’; ‘sessionManagement’ and ‘shutdownSessions’.

The ‘SessionManagement’ trait holds a ‘HashMap’ with all the session Actors mapped by user name as well as a reference to the storage (to be able to pass it in to each newly created ‘Session’).

The ‘sessionManagement’ function performs session management by responding to the ‘Login’ and ‘Logout’ messages. For each ‘Login’ message it creates a new ‘Session’ Actor, starts it and puts it in the ‘sessions’ Map and for each ‘Logout’ message it does the opposite; shuts down the user’s session and removes it from the ‘sessions’ Map.

The ‘shutdownSessions’ function simply shuts all the sessions Actors down. That completes the user session management.

/**
* Implements user session management.
* <p/>
* Uses self-type annotation 'this: Actor =>'
* to declare that it needs to be mixed in with an Actor.
*/
trait SessionManagement { this: Actor =>

val storage: ChatStorage // needs someone to provide the ChatStorage
val sessions = new HashMap[String, Actor]

protected def sessionManagement: PartialFunction[Any, Unit] = {
case Login(username) =>
log.info("User [%s] has logged in", username)
val session = new Session(username, storage)
session.start
sessions += (username -> session)

case Logout(username) =>
log.info("User [%s] has logged out", username)
val session = sessions(username)
session.stop
sessions -= username
}

protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop }
}

Chat message management

Chat message management is implemented by the ‘ChatManagement’ trait. It has an abstract ‘HashMap’ session member field with all the sessions. Since it is abstract it needs to be mixed in with someone that can provide this reference. If this dependency is not resolved when composing the final component, you will get a compilation error.

It implements the ‘chatManagement’ function which responds to two different messages; ‘ChatMessage’ and ‘GetChatLog’. It simply gets the session for the user (the sender of the message) and routes the message to this session. Here we also use the ‘forward’ function to make sure the original sender reference is passed along to allow the end receiver to reply back directly.

/**
* Implements chat management, e.g. chat message dispatch.
* <p/>
* Uses self-type annotation 'this: Actor =>'
* to declare that it needs to be mixed in with an Actor.
*/
trait ChatManagement { this: Actor =>
val sessions: HashMap[String, Actor] // someone needs to provide the Session map

protected def chatManagement: PartialFunction[Any, Unit] = {
case msg @ ChatMessage(from, _) => sessions(from) ! msg
case msg @ GetChatLog(from) => sessions(from) forward msg
}
}

Using an Actor as a message broker, as in this example, is a very common pattern with many variations; load-balancing, master/worker, map/reduce, replication, logging etc. It becomes even more useful with remote Actors when we can use it to route messages to different nodes.

STM and Transactors

Actors are excellent for solving problems where you have many independent processes that can work in isolation and only interact with other Actors through message passing. This model fits many problems. But the Actor model is unfortunately a terrible model for implementing truly shared state. E.g. when you need to have consensus and a stable view of state across many components. The classic example is the bank account where clients can deposit and withdraw, in which each operation needs to be atomic. For detailed discussion on the topic see this presentation.

Software Transactional Memory (STM) on the other hand is excellent for problems where you need consensus and a stable view of the state by providing compositional transactional shared state. Some of the really nice traits of STM are that transactions compose and that it raises the abstraction level from lock-based concurrency.

Akka has a STM implementation that is based on the same ideas as found in the Clojure language; Managed References working with immutable data.

Akka allows you to combine Actors and STM into what we call Transactors (short for Transactional Actors), these allow you to optionally combine Actors and STM provides IMHO the best of the Actor model (simple concurrency and asynchronous event-based programming) and STM (compositional transactional shared state) by providing transactional, compositional, asynchronous, event-based message flows. You don’t need Transactors all the time but when you do need them then you really need them.

Akka currently provides three different transactional abstractions; ‘Map’, ‘Vector’ and ‘Ref’. They can be shared between multiple Actors and they are managed by the STM. You are not allowed to modify them outside a transaction, if you do so, an exception will be thrown.

What you get is transactional memory in which multiple Actors are allowed to read and write to the same memory concurrently and if there is a clash between two transactions then both of them are aborted and retried. Aborting a transaction means that the memory is rolled back to the state it were in when the transaction was started.

In database terms STM gives you ‘ACI’ semantics; ‘Atomicity’, ‘Consistency’ and ‘Isolation’. The ‘D’ in ‘ACID’; ‘Durability’, you can’t get with an STM since it is in memory. This however is addressed by the persistence module in Akka.

Persistence: Storing the chat log

Akka provides the possibility of taking the transactional data structures we discussed above and making them persistent. It is an extension to the STM which guarantees that it has the same semantics.

The persistence module has pluggable storage back-ends. At the time of the writing it has three different storage back-ends:

  • Cassandra – A distributed structured storage database.
  • MongoDB – A high performance schema-free, document oriented data store with SQL like query facilities.
  • Redis – An advanced key-value store, also called a data structure server, with lists, ordered sets etc.

They all implement persistent ‘Map’, ‘Vector’ and ‘Ref’. Which can be created and retrieved by id through one of the storage modules.

val map =    RedisStorage.newMap(id)
val vector = CassandraStorage.newVector(id)
val ref = MongoStorage.newRef(id)

Chat storage: Backed by Redis

Now let’s implement the persistent storage. We start by creating a ‘ChatStorage’ trait allowing us to have multiple different storage backend. For example one in-memory and one persistent.

/**
* Abstraction of chat storage holding the chat log.
*/
trait ChatStorage extends Actor

Let’s use Redis to implementation the persistent storage. Redis is an excellent storage backend, blazingly fast with a rich data model.

Our ‘RedisChatStorage’ extends the ‘ChatStorage’ trait. The only state it holds is the ‘chatLog’ which is a ‘Vector’ managed by Redis. We give it an explicit id (the String “akka.chat.log”) to be able to retrieve the same vector across remote nodes and/or through server restarts.

It responds to two different messages; ‘ChatMessage’ and ‘GetChatLog’. The ‘ChatMessage’ message handler takes the ‘message’ attribute and appends it to the ‘chatLog’ vector. Here you can see that we are using the ‘atomic { … }’ block to run the vector operation in a transaction. Redis works with binary data so we need to convert the message into a binary representation. Since we are using Strings we just have to invoke ‘message.getBytes(“UTF-8”)’, but if we would have had a richer message that we wanted to persist then we would have had to use one of the Akka’s serialization traits or serializers. You can read more about that here.

The ‘GetChatLog’ message handler retrieves all the messages in the chat log storage inside an atomic block, iterates over them using the ‘map’ combinator transforming them from ‘Array[Byte] to ’String’. Then it invokes the ‘reply(message)’ function that will send the chat log to the original sender; the ‘ChatClient’.

You might rememeber that the ‘ChatServer’ was supervising the ‘ChatStorage’ actor. When we discussed that we showed you the supervising Actor’s view. Now is the time for the supervised Actor’s side of things. First, a supervised Actor need to define a life-cycle in which it declares if it should be seen as a:

  • ‘Permanent’: which means that the actor will always be restarted.
  • ‘Temporary’: which means that the actor will not be restarted, but it will be shut down through the regular shutdown process so the ‘shutdown’ callback function will called.

We define the ‘RedisChatStorage’ as ‘Permanent’ by setting the ‘lifeCycle’ member field to ‘Some(LifeCycle(Permanent))’.

The idea with this crash early style of designing your system is that the services should just crash and then they should be restarted and reset into a stable state and continue from there. The definition of “stable state” is domain specific and up to the application developer to define. Akka provides two callback functions; ‘preRestart’ and ‘postRestart’ that are called right before and right after the Actor is restarted. Both of these functions take a ‘Throwable’, the reason for the crash, as argument. In our case we just need to implement the ‘postRestart’ hook and there re-initialize the ‘chatLog’ member field with a fresh persistent ‘Vector’ from Redis.

/**
* Redis-backed chat storage implementation.
*/
class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent))

private var chatLog = RedisStorage.getVector("akka.chat.log")

log.info("Redis-based chat storage is starting up...")

def receive = {
case msg @ ChatMessage(from, message) =>
log.debug("New chat message [%s]", message)
atomic {
chatLog + message.getBytes("UTF-8")
}

case GetChatLog(_) =>
val messageList = atomic {
chatLog.map(bytes => new String(bytes, "UTF-8")).toList
}
reply(ChatLog(messageList))
}

override def postRestart(reason: Throwable) =
chatLog = RedisStorage.getVector("akka.chat.log")
}

The last thing we need to do in terms of persistence is to create a ‘RedisChatStorageFactory’ that will take care of instantiating and resolving the ‘val storage: ChatStorage’ field in the ‘ChatServer’ with a concrete implementation of our persistence Actor.

/**
* Creates and a RedisChatStorage.
*/
trait RedisChatStorageFactory {
val storage: ChatStorage = new RedisChatStorage
}

Composing the full Chat Service

We have now created the full functionality for the chat server, all nicely decoupled into isolated and well-defined traits. Now let’s bring all these traits together and compose the complete concrete ‘ChatService’.

/**
* Object encapsulating the full Chat Service.
*/
object ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
RedisChatStorageFactory

Making the ChatService remote

Now that we have the ‘ChatService’ object how do we make it into a remote service that we can use from different nodes?

It is very simple. We only need to do two things. First we need to start up a remote server to run the ‘ChatService’. Then for each client that wants to use the ‘ChatService’ we just need to invoke ‘ChatService.makeRemote’ to get a handle to the remote ‘ChatService’.

Starting the first step. We have two options on how we can start up a remote server. Either start up the ‘RemoteNode’ in some part of the code that runs on the machine you want to run the server on (can just be a simple class with a ‘main’ method).

We start the ‘RemoteNode’ by invoking ‘start’ and passing in the host name and port.

RemoteNode.start("darkstar", 9999)

You can also choose to use the version of ‘start’ that takes a ‘ClassLoader’ as argument if you want to be explicit on through which class loader you want to load the class of the Actor that you want to run as remote service.

The second option is to put your application in a JAR file and drop it into the ‘AKKA_HOME/deploy’ directory and then start up the Akka microkernel. This will deploy your application and start the ‘RemoteNode’ for you. Then you use the ‘AKKA_HOME/config/akka.conf’ configuration file to configure the remote server (among many other things). The microkernel is started up like this:

export AKKA_HOME=...
cd $AKKA_HOME
java -jar $AKKA_HOME/dist/akka-0.6.jar

That was the server part. The client part is just as simple. We only need to tell the runtime system that we want to use the ‘ChatService’ as a remote Actor by invoking the ‘makeRemote(hostname, port)’ function on it. This will instantiate the Actor on the remote host and turn the local Actor instance into a proxy or handle through which we can use the remote Actor transparently with the exact same semantics as if it was a regular local Actor.

That’s it. Now let’s run a sample client session.

import se.scalablesolutions.akka.sample.chat._

/**
* Test runner emulating a chat session.
*/
object Runner {
// create a handle to the remote ChatService
ChatService.makeRemote("localhost", 9999)
ChatService.start

def run = {
val client = new ChatClient("jonas")

client.login

client.post("Hi there")
println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t"))

client.post("Hi again")
println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t"))

client.logout
}
}

Sample code

All this code is available as part of the Akka distribution. It resides in the ‘./akka-samples/akka-sample-chat’ module and have a ‘README’ file explaining how to run it as well as a Maven ‘pom.xml’ build file so it is easy to build, run, hack, rebuild, run etc. You can also just read the next section for instructions on how to run it.

Or if you rather browse it online.

Run it

First we need to start up Redis.

  1. Download Redis from here.
  2. Step into the distribution.
  3. Build: ‘make install’.
  4. Run: ‘./redis-server’.

For details on how to set up Redis server have a look here.

Download and build Akka

  1. Check out Akka from http://github.com/jboner/akka.
  2. Set ‘AKKA_HOME’ environment variable to the root of the Akka distribution.
  3. Open up a shell and step into the Akka distribution root folder.
  4. Build Akka by invoking ‘mvn install -Dmaven.test.skip=true’. This will also bulid the sample application and deploy it to the ‘$AKKA_HOME/deploy’ directory.

Run the microkernel

export AKKA_HOME=...
cd $AKKA_HOME
java -jar ./dist/akka-0.6.jar

Run a sample chat session

  1. Now start up a new shell and go down into the ‘./akka-samples/akka-sample-chat’ directory.
  2. Invoke ‘mvn scala:console -o’. This will give you a Scala REPL (interpreter) with the chat application and all its dependency JARs on the classpath.
  3. Simply paste in the whole code block with the ‘Runner’ object above and invoke ‘Runner.run’. This run a simulated client session that will connect to the running server in the microkernel.
  4. Invoke ‘Runner.run’ again and again…

Now you could test client reconnect by killing the running microkernel and start it up again. See the client reconnect take place in the REPL shell.

That’s it. Have fun.

Onward

There is much much more to Akka than what we have covered in this article. For example Active Objects, Cluster Membership API, a Comet module, REST (JAX-RS) integration, a Security module, AMQP integration, Spring integration, Google Guice integration, Lift integration, a rich Transaction API, tons of configuration possibilities etc.

From http://jonasboner.com

Published at DZone with permission of its author, Jonas Boner.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Banq Ee replied on Fri, 2010/01/08 - 2:35am

Transactors is simpler than JTA in Java? its usage is like transaction in java. in java we can implements Actors model with asynchronously domain events + jms , don't believe? refer a opensource java DDD framework ppt: http://www.slideshare.net/banq/jdonframework-2734177

Unknown Notknown replied on Wed, 2010/01/20 - 2:25am

Good one.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.