Enterprise Integration Zone is brought to you in partnership with:

John D'Emic is a technologist, developer and author. He is currently a Solutions Architect at MuleSoft and a co-author of both editions of Mule in Action. John is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

NoSQL with Apache Cassandra and Mule

06.03.2013
| 4757 views |
  • submit to reddit

Apache Cassandra is a column-based, distributed database.  Until recently the only way to interact with databases from Mule was to reuse one of the existing Java clients, like Hector or Astyanax, in a component.  Mule’s Cassandra DB Module now provides message processors to insert, update, query and delete data in Cassandra.

To show off some of the features of the Cassandra module I’ll show how to implement a simple account management API.  This API will allow clients to perform CRUD operations on accounts, behaving similarly to something like an LDAP directory.

Inserting Columns

The Cassandra Module uses Java maps as the mechanism to define how data is inserted and retrieved from a Cassandra key space.  For this example we’ll use Mule’s JSON transformers to move data back and forth via HTTP.  Let’s take a look at what the account data looks like.

{
    "Accounts":{
        "engineering":{
            "joe@acmesoft.com":{
                "Name":"Joe Developer",
                "Password":"286755fad04869ca523320acce0dc6a4",
                "passwordAge": 731400
            },
            "jane@acmesoft.com":{
                "Name":"Jane Developer",
                "Password":"10b222970537b97919db36ec757370d2",
                "passwordAge": 10082400

            },
            "john@acmesoft.com":{
                "Name":"Jane Developer",
                "Password":"10b222970537b97919db36ec757370d2",
                "passwordAge": 1080000
            }
        },
        "operations":{
            "bill@acmesoft.com":{
                "Name":"Bill SysAdmin",
                "Password":"f1f16683f3e0208131b46d37a79c8921",
                "passwordAge": 4343100
            },
            "jill@acmesoft.com":{
                "Name":"Jill NetworkAdmin",
                "Password":"32a3571fa12b39266a58d42234836839",
                "passwordAge": 41923143
            }
        }
    }
}

When we persist this JSON to Cassandra the column family will be “Accounts”, each organizational unit will be a row key (ie, “Engineering” and “Operations”) and the account data like the username, password and time since the last password change will be contained in a super column.

Let’s configure the Mule flow to persist this data via HTTP.

<flow name="AccountCreate" doc:name="AccountsCreate">
        <http:inbound-endpoint 
            exchange-pattern="request-response" 
            host="localhost" 
            port="8081"
            path="account/create" 
            mimeType="application/json" />
        <json:json-to-object-transformer 
               returnClass="java.util.Map"/>
        <cassandradb:insert config-ref="CassandraDB" />
        <json:object-to-json-transformer />        
</flow>

This flow will accept the JSON account data we just saw over HTTP, transform it to a Map, use the Cassandra connector’s “insert” message processor to persist the data and then return the payload back to JSON to return to the client.

Column Serialization

One of the benefits, as well as challenges, with Cassandra is that all data is stored as byte arrays.  This makes it extremely flexible in terms of data storage but also means that type information is lost.  The Cassandra module makes use of Hector’s serializers to let you specify how data is transformed when pulling data out of a column.

Let’s take a look at how this works by specifying two query operations for the API.  The first will allow us to query for a user based on email address – which you’ll recall maps to the row key.

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

This flow will accept the JSON account data we just saw over HTTP, transform it to a Map, use the Cassandra connector’s “insert” message processor to persist the data and then return the payload back to JSON to return to the client.

Column Serialization

One of the benefits, as well as challenges, with Cassandra is that all data is stored as byte arrays.  This makes it extremely flexible in terms of data storage but also means that type information is lost.  The Cassandra module makes use of Hector’s serializers to let you specify how data is transformed when pulling data out of a column.

Let’s take a look at how this works by specifying two query operations for the API.  The first will allow us to query for a user based on email address – which you’ll recall maps to the row key.

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

We’re using the Mule Expression Language  to parse the URI.  This is how we infer  the columnPath and rowKey.  In this case the columnPath will be “operators” and the rowKey will be “bill@acmesoft.com”.  We can query for Bill’s account now as follows:

http://localhost:8081/account/get/operations/bill@acmesoft.com

There’s one problem though. When the response comes back it looks like this:

{
   "bill@acmesoft.com":{
      "passwordAge":"\u0000BE<",
      "Name":"Bill SysAdmin",
      "Password":"f1f16683f3e0208131b46d37a79c8921"
   }
}

The password age is a string instead of an integer.  This is because the Cassandra Module defaults to string serialization unless an explicit column-serializer is defined.  Let’s add one to  fix the flow.

<flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
           exchange-pattern="request-response"
           host="localhost" 
           port="8081" 
           path="account/get" />            
        <cassandradb:get config-ref="CassandraDB" 
          columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
         rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb">
           <cassandradb:column-serializers>
                <cassandradb:column-serializer 
                     key="passwordAge" 
                     type="java.lang.Integer"/>
            </cassandradb:column-serializers>
         </cassandradb:get>       
        <json:object-to-json-transformer/>
    </flow>

Now when we refresh the URL something like this should appear:

{
   "bill@acmesoft.com":{
      "passwordAge":4343100,
      "Name":"Bill SysAdmin",
      "Password":"f1f16683f3e0208131b46d37a79c8921"
   }
}

Column serialization is available for all data types supported by Hector.

Column Slices

The Cassandra Module additionally allows you to query by column slice.  The following flow will return all accounts for a given organizational unit (row key):

 <flow name="AccountList" doc:name="AccountsList">
        <http:inbound-endpoint 
             exchange-pattern="request-response" 
             host="localhost" port="8081"
             path="account/list"/>          
         <cassandradb:get-slice 
              config-ref="CassandraDB" 
              rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" 
columnParent="Accounts" count="100">
         	<cassandradb:column-serializer 
               key="passwordAge" type="java.lang.Integer"/>        
         </cassandradb:get-slice>                  
        <json:object-to-json-transformer/>
</flow>

This will return up to 100 columns from the supplied row.  For instance this URL: http://localhost:8081/account/list/operations Will return something as follows:

[
   {
      "bill@acmesoft.com":{
         "passwordAge":4343100,
         "Name":"Bill SysAdmin",
         "Password":"f1f16683f3e0208131b46d37a79c8921"
      }
   },
   {
      "jill@acmesoft.com":{
         "passwordAge":41923143,
         "Name":"Jill NetworkAdmin",
         "Password":"32a3571fa12b39266a58d42234836839"
      }
   }
]

Column Deletion

Deleting columns is just as easy.  The following flow demonstrates how to remove a column from a row:

<flow name="AccountDelete" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response" 
              host="localhost" 
              port="8081" 
              path="account/delete"/>            
        <cassandradb:remove config-ref="CassandraDB" 
              columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
             rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb"/>         
        <json:object-to-json-transformer />
</flow>

So to delete Bill’s account we’d use a URL as follows:

http://localhost:8081/account/delete/operations/bill@acmesoft.com

Summary and What’s Next

Cassandra is a powerful contender in the NoSQL landscape.  It’s particularly suited for large data sets that need to span  multiple datacenters.  Some features we’re hoping to add to the module, and cover here, are support for a Cassandra backed Mule object store as well as support for CQL as an alternative query mechanism.



Published at DZone with permission of John D'Emic, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)