NoSQL Zone is brought to you in partnership with:

Brian has 10+ years of experience as a technology leader and architect in a wide variety of settings from early startups to Fortune 500 companies. With experience delivering SaaS solutions in business intelligence, artificial intelligence and VoIP, his current focus is big data and analytics. Brian leads the Virgil project on Apache Extras, which is a services layer built on Cassandra that provides REST, Map/Reduce, Search and Distributed Processing capabilities. Brian is a DZone MVB and is not an employee of DZone and has posted 62 posts at DZone. You can read more from them at their website. View Full User Profile

A Big Data Trifecta: Storm, Kafka and Cassandra

08.07.2012
| 11549 views |
  • submit to reddit
We're big fans of Cassandra.  We also use Storm as our distributed processing engine.  We've had a lot of success using our Cassandra Bolt to create a successful marriage between the two.  To date, we've been using Storm to integrate with our legacy technologies via our JMS Spout.  Now we're looking to expand its role beyond legacy system integration.

In Storm's new role, the work load is orders of magnitude greater and although JMS worked well in the previous integration scenarios, we knew it might not be the best solution to accommodate the volume of work we anticipate. We need to support millions of messages on the queues.   This is not the typical application of JMS and is exactly the reason LinkedIn open sourced Kafka:

"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."

To validate our assumptions, we needed to put Kafka through its paces.  That meant plugging it into our Storm topology.  For those that don't know Storm, think of it aa "Big Data ESB" optimized for processing streams of data that are broken down into discrete packets called Tuples.  Spouts emit tuples.  Bolts consume them. Storm plays the role of message router between the components.

We already had our Cassandra Bolt in place.  All I needed to do was swap out our JMS Spout, with a Kafka Spout.   Here is what the topology looked like:
  TopologyBuilder builder = new TopologyBuilder();
        List hosts = new ArrayList();
        hosts.add("localhost");

        SpoutConfig spoutConfig = SpoutConfig.fromHostStrings(hosts, 1, "test", "/foo", "foo");
        spoutConfig.zkServers = ImmutableList.of("localhost");
        spoutConfig.zkPort = 2181;
        spoutConfig.scheme = new StringScheme();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));

        DefaultBatchingCassandraBolt bolt = new DefaultBatchingCassandraBolt(new MyColumnFamilyMapper(), new MyRowKeyMapper(), new MyColumnsMapper());
        bolt.setAckStrategy(AckStrategy.ACK_ON_WRITE);
        builder.setBolt("loader", bolt).shuffleGrouping("spout");

This topology simply connects a Kafka Spout to a Cassandra Bolt.

 (WARNING: The above code leverages a change to the Cassandra bolt that is still only in my fork.  It may not work for you. Watch this pull request.)

I then queued 10 million JSON records in Kafka. (which took about 5 minutes running locally on a macbookpro)  I then unleashed the topology.

Now, Kafka is *fast*.  When running the Kafka Spout by itself, I easily reproduced Kafka's claim that you can consume "hundreds of thousands of messages per second".  When I first fired up the topology, things went well for the first minute, but then quickly crashed as the Kafka spout emitted  too fast for the Cassandra Bolt to keep up.  Even though Cassandra is fast as well, it is still orders of magnitude slower than Kafka.

Fortunately, since Storm interacts with its Spout's using a pull model, it provides a way to throttle back the messaging.  I added the following parameter to the Config.
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5000);
This limits the number of un-acked tuples in the system.  With the AckStrategy set to ACK_ON_WRITE within the Cassandra Bolt, this established a safe way for the Bolt to communicate back to the Spout that it is "ready for some more".

With this topology, we saw consistent throughput of 5000 writes per second to Cassandra. (running locally on my MBP).  That will work nicely when deployed to the cluster. =)

Kafka has some other nice characteristics that make it well suited for big data applications.  I'll go into the details of those in a future post.

* Kudos to Taylor Goetz.  He has done some great work on the storm components that's made this possible.
Published at DZone with permission of Brian O' Neill, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags:

Comments

Sreenath Ven replied on Tue, 2014/05/06 - 12:59pm

Article is a good informative. Only pain point is Kafka-Spout integration. http://www.techiesinfo.com/architecture/kafka-storm has write up on integrating latest version of Kafka and Spout as of 2014 May.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.