NoSQL Zone is brought to you in partnership with:

Konrad is passionate about the JVM, and the whole ecosystem that surrounds it. Lately he fell in love with Scala, but that doesn't mean he's not into Java or dynamic languages. Other than that, he?s a fan of automating every possible task and ridiculously long keyboard shortcuts. "After hours" he's still bound to programming - as a lead of the PolishJUG, the lead of the Google Developers Group Kraków and helping hand of Software Craftsmanship Kraków he's coding for fun and glory, speaking at conferences, or organising meetups ranging from small hackathons to big conferences like the annual GeeCON. In those rare times when he's not doing something code-related, he's collecting game consoles or playing tennis / squash. He bloggs and tweets. Konrad is a DZone MVB and is not an employee of DZone and has posted 12 posts at DZone. You can read more from them at their website. View Full User Profile

Scala-Powered Streaming Migrations in MongoDB on Millions of Records

11.04.2012
| 4164 views |
  • submit to reddit

 

In my project at work we have some production systems using MongoDB as their primary database. Quite a number of the collections we have there could be called “huge” and we sometimes have to migrate data from one (let’s call it a “schema” for simplicity – ok?) schema to another one, or perform some statistics on the entire collection.

So the first thing you’d try to do is a typical foreach on a rogue query like this:

Person where(_.age > 18) foreach { p => /*...*/ }

… and do whatever stats etc you need to do for each of the elements. Turns out that there’s quite a few reasons this sucks (bad time), and wouldn’t even get close to computing the entire thing. Some problems are:

  • the cursor WILL timeout if used like this
  • we’re mapping stuff to objects of Person, so that takes time and memory

(ps: we could do worse than the example above – don’t try to get a huge collection in memory… ;-)). Moving on to the solution, there’s a few things we have to do here. First, stop the cursor from timing out, which can be achieved by setting the apropriate option on the Cursor:

cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

As you’ve already noticed… we’re down to the low-level api here… For now let’s say I’m ok with that (we’ll enable rogue a bit later again). So putting this in context, you’d have to write:

def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) =
  try { f(c) } finally { cursor.close() }

// somewhere...
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
  withCursor(coll.find()) { cursor =>
    cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
    // ...

Ok, here you can use the cursor and iterate over the whole collection. That’s quite a bit of code but we got to the point the cursor is usable, and won’t time out… The next step is to package this up in a nice function, and allow passing in a rogue query object. I’ll paste in the end result of those steps so you can analise it yourself:

  def stream[T <: MongoRecord[T]]
            (meta: MongoMetaRecord[T], query: Option[DBObject] = None)
            (callback: T => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      withCursor(coll.find(query.getOrElse(null))) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()
          val obj = meta.fromDBObject(next)

          callback(obj)
        }
      }
    }
  }

// can be used like this:
val meta = Person
stream(meta) { migrateIt(_) }

// or with the query supplied:
import scalaz.Scalaz._
import com.foursquare.rogue.Rogue._
val query = meta where(_.age > 18)
stream(meta, query.asDBObject.some) { migrateIt }

There is a minor annoyance with writing query.asDBObject.some (which is Scalaz’s equivalent of Some(query.asDBObject))… Let’s fix this with a simple delegating function:

def stream[T <: MongoRecord[T]]
          (meta: MongoMetaRecord[T], 
           query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _])
          (callback: T => Unit) {
  stream(meta, query(meta).asDBObject.some)(callback)
}

// so we can call it like:
import com.foursquare.rogue.Rogue._
stream(meta, meta.where(_.age > 18)) { migrateIt }

But I still am not happy with this. As already mentioned, we’re dealing with huge collections, so the processing may well take up to a day sometimes. Let’s see what we can cut off here… Oh yeah – let’s not select fields we don’t use. Let’s revise our code to implement a streamSelect version of the above, which will only select fields we’re interested in from mongo:

  /** So we don't have to manually extract field names */
  def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] =
    for(f <- fields) yield f(on).name

  def streamSelect[T <: MongoRecord[T]]
                  (meta: MongoMetaRecord[T], 
                   select: Seq[String] = Nil, 
                   query: DBObject)(callback: List[Any] => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      val selectFields = new BasicDBObject(select.map(_ -> 1).toMap)

      withCursor(coll.find(query, selectFields)) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()

          val fieldValues = select map { next.get(_) match {
              case l: BasicDBList => l.toList
              case other => other
            }
          }

          callback(fieldValues)
        }
      }
    }
  }


// which can be used as:
val m = Person
val above18 = m where (_.age > 18)
streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) { 
  case (name: String) :: (age: Int) :: (h: Double) :: Nil =>
   // ...
}

// instead of using the bellow helper you can pass names manually:
select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)

Here we only select the fields we really need – which has proven to be a big performance boost. It’s quite readable, though I wasn’t able to get rid of the

[Person]

type parameter in the

select

helper. We’re using such streams wherever we know there’s “a lot of stuff to process” or in so-called “preloads”, where we compute a set of values from an entire collection for alter re-use.

You may have noticed that all this has not been very TypeSafe (the callback isn’t). So… you might ask, did we implement a “typesafe version of our streams? And in fact we did, though it’s tuple based, so we had to implement the same thing multiple times – for different tuples. I’ll paste just the usage of the TypeSafe version here (and if you’re interested I can do a follow-up blog post about them):

val m = Person
streamTypesafe(m)(m.age, m.name, m.height)) {
  (age, name, height) =>
  // yes, age: Int, name: String, and height: Double! :-)  
}

What we’ve gained using those streams is the ability to easily write all the migrations we need to, and we’re still elastic enough to for example – remove fields from the collection (with some processing, so just a mongo query won’t be enough). In streamSelect we’re open to have multiple case statements, so even if the collection is not homogenic, we can match null on some of the fields and still proceed over the entire collection – if needed.

Anyway, I hope you found this bunch of code snippets interesting or useful – we certainly do in our day to day coding :-)




Published at DZone with permission of Konrad Malawski, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)