Dmitriy Setrakyan manages daily operations of GridGain Systems and brings over 12 years of experience to GridGain Systems which spans all areas of application software development from design and architecture to team management and quality assurance. His experience includes architecture and leadership in development of distributed middleware platforms, financial trading systems, CRM applications, and more. Dmitriy is a DZone MVB and is not an employee of DZone and has posted 57 posts at DZone. You can read more from them at their website. View Full User Profile

Counting Words in Real Time on GridGain

04.07.2012
| 2930 views |
  • submit to reddit
I promised a while back to publish the code from live coding GridGain presentation I did at QCon London. Since the presentation was in Scala, the code I will be posting here is in Scala, but I will post the Java version within a couple of days as well.

First a brief intro. We all know Hadoop's counting words example which takes a file with words and then produces another file with number of occurrences next to each word. Hadoop does this example very well, however the main caveat with Hadoop's example is that it is not real time.

The counting words example I did at QCon actually counted words in real time. The program was split into two parts. First part is responsible for loading the words in real time into GridGain data grid, and the second part was querying the grid every 3 seconds to continuously print out top 10 words stored so far.

The example was done using 'Scalar' - GridGain DSL for Scala, but it could have been done In Java as well using GridGain Java APIs.
Continuously Populate Words In Real Time

Let's start by continuously loading data grid with new words. To do that, I downloaded several books in text format and started concurrently reading them from the populate(...) method, one thread per book. For every word read, I store it in cache, having the word itself as a key and number of current occurrences as a value. Also note how we let grid asynchronously update cache using asynchronous run while reading the next line from the book file (in reality you would most likely have more than one asynchronous job or have GridGain data loading functionality do it for you).

def populate(threadPool: CompletionService, dir: File) {
  val bookFileNames = dir.list()
 
  // For every book, start a new thread and start populating cache
  // with words and their counts.
  for (bookFileName <- bookFileNames) {
    threadPool.submit(new Callable {
      def call() = {
        val cache = grid$.cache[String, JInt]
 
        var fut: GridFuture[_] = null;
 
        Source.fromFile(new File(dir, name)).getLines().foreach(line => {
          line.split("[^a-zA-Z0-9]").foreach(word => {
            if (!word.isEmpty) {
              if (fut != null)
                fut.get()
 
              fut = grid$.affinityRunAsync(null, word, () => {
                // Increment word counter and store it in cache.
                // We use cache transaction to make sure that
                // gets and puts are consistent and atomic.
                cache.inTx(
                  () => cache += (word -> (cache.getOrElse(word, 0) + 1))
                )
 
                ()
              })
            }
          })
        })
 
        None // Return nothing.
      }
    })
  }
 
  // Wait for all threads to finish.
  books.foreach(_ => threadPool.take().get())
}
Distributed SQL Query

Now let's implement our distributed query against GridGain data grid which will run every 3 seconds. Note that we are using standard SQL syntax to query remote grid nodes. Interesting enough that GridGain data grid allows you to use SQL virtually without any limitations. You can use any native SQL function and even SQL JOINs between different classes. Here, for example, we are using SQL length(...) function to only query words greater than 3 letters long just to get rid of frequent short articles like "a" or "the" in our searches. We are also using desc keyword to sort word counts in descending order and limit keyword to limit our selection only to 10 words.

def queryPopularWords(cnt: Int) {
  // Type alias for sequences of strings (for readability only).
  type SSeq = Seq[String]
 
  grid$.cache[String, JInt].sqlReduce(
    // PROJECTION (where to run):
    grid$.projectionForCaches(null),
    // SQL QUERY (what to run):
    "length(_key) > 3 order by _val desc limit " + cnt,
    // REMOTE REDUCER (how to reduce on remote nodes):
    (it: Iterable[(String, JInt)]) =>
      // Pre-reduce by converting
      // Seq[(String, JInt)] to Map[JInt, Seq[String]].
      (it :\ Map.empty[JInt, SSeq])((e, m) =>
        m + (e._2 -> (m.getOrElse(e._2, Seq.empty[String]) :+ e._1))),
    // LOCAL REDUCER (how to finally reduce on local node):
    (it: Iterable[Map[JInt, SSeq]]) => {
      // Print 'cnt' of most popular words collected from all remote nodes.
      (new TreeMap()(implicitly[Ordering[JInt]].reverse) ++ it.flatten)
        .take(cnt).foreach(println _)
 
      println("------------") // Formatting.
    }
  )
}
Start Example

And finally let's implement our main(...) method our populate(...) and queryPopularWords(...) methods we just defined.

def main(args: Array[String]) {
  // Initialize book directory
  val bookDir = new File(BOOK_PATH);
 
  // Start GridGain with specified configuration file.
  scalar("examples/config/spring-cache-popularwords.xml") {
    // Create as many threads as we have book, so we can use
    // thread per book to load data grid concurrently.
    val threadPool = Executors.newFixedThreadPool(bookDir.list.length);
 
    val popWordsQryTimer = new Timer("words-query-worker");
 
    try {
      // Schedule word queries to run every 3 seconds.
      popWordsQryTimer.schedule(new TimerTask {
        def run() {
          queryPopularWords(10) // Query top 10 words from data grid.
        }
      }, 3000, 3000)
 
      // Populate cache with word counts.
      populate(new ExecutorCompletionService(threadPool), bookDir)
 
      // Force one more run to print final counts.
      queryPopularWords(POPULAR_WORDS_CNT)
    }
    finally {
      popWordsQryTimer.cancel() // Cancel timer.
 
      threadPool.shutdownNow() // Graceful shutdown.
    }
  }
}

To execute the example, start several GridGain stand-alone nodes using examples/config/spring-cache-popularwords.xml configuration file and then start the example we just created from IDE. You may wish to add more printouts for better visibility of what's happening.

This example is also shipped with GridGain 4.0 and also available in GridGain GitHub Repository.

 

 

 

 

 

Published at DZone with permission of Dmitriy Setrakyan, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)