Debasish specializes in leading delivery of enterprise scale solutions for various clients ranging from small ones to Fortune 500 companies. He is the technology evangelist of Anshin Software (http://www.anshinsoft.com) and takes pride in institutionalizing best practices in software design and programming. He loves to program in Java, Ruby, Erlang and Scala and has been trying desperately to get out of the unmanaged world of C++. Debasish is a DZone MVB and is not an employee of DZone and has posted 55 posts at DZone. You can read more from them at their website. View Full User Profile

Event Sourcing, Akka FSMs and functional domain models

01.26.2012
| 3746 views |
  • submit to reddit

I blogged on Event Sourcing and functional domain models earlier. In this post I would like to share more of my thoughts on the same subject and how with a higher level of abstraction you can make your domain aggregate boundary more resilient and decoupled from external references.

When we talk about a domain model, the Aggregate takes the centerstage. An aggregate is a core abstraction that represents the time invariant part of the domain. It's an embodiment of all states that the aggregate can be in throughout its lifecycle in the system. So, it's extremely important that we take every pain to distil the domain model and protect the aggregate from all unwanted external references. Maybe an example will make it clearer.

Keeping the Aggregate pure

Consider a Trade model as the aggregate. By Trade, I mean a security trade that takes place in the stock exchange where counterparties exchange securities and currencies for settlement. If you're a regular reader of my blog, you must be aware of this, since this is almost exclusively the domain that I talk of in my blog posts.

A trade can be in various states like newly entered, value date added, enriched with tax and fee information, net trade value computed etc. In a trading application, as a trade passes through the processing pipeline, it moves from one state to another. The final state represents the complete Trade object which is ready to be settled between the counterparties.

In the traditional model of processing we have the final snapshot of the aggregate - what we don't have is the audit log of the actual state transitions that happened in response to the events. With event sourcing we record the state transitions as a pipeline of events which can be replayed any time to rollback or roll-forward to any state of our choice. Event sourcing is coming up as one of the potent ways to model a system and there are lots of blog posts being written to discuss about the various architectural strategies to implement an event sourced application.

That's ok. But whose responsibility is it to manage these state transitions and record the timeline of changes ? It's definitely not the responsibility of the aggregate. The aggregate is supposed to be a pure abstraction. We must design it as an immutable object that can respond to events and transform itself into the new state. In fact the aggregate implementation should not be aware of whether it's serving an event sourced architecture or not.

There are various ways you can model the states of an aggregate. One option that's frequently used involves algebraic data types. Model the various states as a sum type of products. In Scala we do this as case classes ..

sealed abstract class Trade {
  def account: Account
  def instrument: Instrument
  //..
}
 
case class NewTrade(..) extends Trade {
  //..
}
 
case class EnrichedTrade(..) extends Trade {
  //..
}
Another option may be to have one data type to model the Trade and model states as immutable enumerations with changes being effected on the aggregate as functional updates. No in place mutation, but use functional data structures like zippers or type lenses to create the transformed object in the new state. Here's an example where we create an enriched trade out of a newly created one ..
// closure that enriches a trade
val enrichTrade: Trade => Trade = {trade =>
  val taxes = for {
    taxFeeIds      <- forTrade // get the tax/fee ids for a trade
    taxFeeValues   <- taxFeeCalculate // calculate tax fee values
  }
  yield(taxFeeIds ° taxFeeValues)
  val t = taxFeeLens.set(trade, taxes(trade))
  netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2)))
}

But then we come back to the same question - if the aggregate is distilled to model the core domain, who handles the events ? Someone needs to model the event changes, effect the state transitions and take the aggregate from one state to the next.

Enter Finite State Machines

In one of my projects I used the domain service layer to do this. The domain logic for effecting the changes lies with the aggregate, but they are invoked from the domain service in response to events when the aggregate reaches specific states. In other words I model the domain service as a finite state machine that manages the lifecycle of the aggregate.

In our example a Trading Service can be modeled as an FSM that controls the lifecycle of a Trade. As the following ..

import TradeModel._
 
class TradeLifecycle(trade: Trade, timeout: Duration, log: Option[EventLog])
  extends Actor with FSM[TradeState, Trade] {
  import FSM._
 
  startWith(Created, trade)
 
  when(Created) {
    case Event(e@AddValueDate, data) =>
      log.map(_.appendAsync(data.refNo, Created, Some(data), e))
      val trd = addValueDate(data)
      notifyListeners(trd)
      goto(ValueDateAdded) using trd forMax(timeout)
  }
 
  when(ValueDateAdded) {
    case Event(StateTimeout, _) =>
      stay
 
    case Event(e@EnrichTrade, data) =>
      log.map(_.appendAsync(data.refNo, ValueDateAdded, None,  e))
      val trd = enrichTrade(data)
      notifyListeners(trd)
      goto(Enriched) using trd forMax(timeout)
  }
 
  when(Enriched) {
    case Event(StateTimeout, _) =>
      stay
 
    case Event(e@SendOutContractNote, data) =>
      log.map(_.appendAsync(data.refNo, Enriched, None,  e))
      sender ! data
      stop
  }
 
  initialize
}

The snippet above contains a lot of other details which I did not have time to prune. It's actually part of the implementation of an event sourced trading application that uses asynchronous messaging (actors) as the backbone for event logging and reaching out to multiple consumers based on the CQRS paradigm.

Note that the FSM model above makes it very explicit about the states that the Trade model can reach and the events that it handles while in each of these states. Also we can use this FSM technique to log events (for event sourcing), notify listeners about the events (CQRS) in a very much declarative manner as implemented above.

Let me know in the comments what are your views on this FSM approach towards handling state transitions in domain models. I think it helps keep aggregates pure and helps design domain services that focus on serving specific aggregate roots.

I will be talking about similar stuff, Akka actor based event sourcing implementations and functional domain models in PhillyETE 2012. Please drop by if this interests you.

 

From http://debasishg.blogspot.com/2012/01/event-sourcing-akka-fsms-and-functional.html

Published at DZone with permission of Debasish Ghosh, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags:

Comments

Ant Kutschera replied on Tue, 2012/01/31 - 2:01am

Hi Debasish In enrich, you call two methods: <- and the degree symbol. What do they do? We sell tickets. The aggregate is an offer. The offer changes state to reserved, booked and cancelled. Each of those is a different domain class. State transitions are modelled outside the domain model. We use a service to change the state and create a new domain object of the correct type to reflect the state. The domain model contains zero logic, the logic is all in the service. These are just one way of doing things, I thought I would share it. I have another question: since cqrs is full of "side effects" (it sends events and no one knows who is listening), does that conflict with a functional approach to programming? Thanks, Ant

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.