NoSQL Zone is brought to you in partnership with:

Antoine works as a Technical Manager at MongoDB Inc, helping companies architect systems with MongoDB. Previously he worked as a kernel engineer on MongoDB core and the Java driver. He also has spent many years in the CDN industry, at Panther CDN then CDNetworks, designing and developing one of the largest and fastest Content Delivery and Application Acceleration Network. Prior work include the development of a new network protocol in Linux kernel to multiplex several interfaces, used in wireless devices. Antoine received a MS degree in Computer Science from Stevens Institute (Hoboken), and a BS in Computer Science from Epita (Paris France). Antoine is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

How to Speed Up MongoDB MapReduce by 20x

10.29.2013
| 15785 views |
  • submit to reddit

Analytics is becoming an increasingly important topic with MongoDB since it is in use for more and more large critical projects. People are tired of using different software to do analytics (Hadoop being pretty involving), and they typically require a massive transfer of data that can be costly.

MongoDB offers two ways to analyze data in-place: MapReduce and the Aggregation Framework. MR is extremely flexible and easy to take on. It works well with sharding and allows for a very large output. MR was heavily improved in MongoDB v2.4 by the JavaScript engine swap from Spider Monkey to V8. The chief complaint about it is that it is quite slow, especially compared to the Agg Framework (which uses C++). Let’s see if we can squeeze some juice out of it.

The Exercise

Let’s insert 10 million documents containing a single integer value between 0 and 1 million. This means that on average, 10 documents have the same value.

> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
> db.uniques.findOne()
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
> db.uniques.ensureIndex({dim0: 1})
> db.uniques.stats()
{
        "ns" : "test.uniques",
        "count" : 10000000,
        "size" : 360000052,
        "avgObjSize" : 36.0000052,
        "storageSize" : 582864896,
        "numExtents" : 18,
        "nindexes" : 2,
        "lastExtentSize" : 153874432,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 576040080,
        "indexSizes" : {
                "_id_" : 324456384,
                "dim0_1" : 251583696
        },
        "ok" : 1
}

From here we want to get the count of unique values. This can be done easily with the following MR job:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" })
{
        "result" : "mrout",
        "timeMillis" : 1161960,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1059138,
                "output" : 999961
        },
        "ok" : 1
}

As you can see, in the output it takes about 1200 seconds (tested on EC2 M3 instance). There are 10 million maps, 1 million reduces, 999961 documents in output. The result looks like this:

> db.mrout.find()
{ "_id" : 1, "value" : 10 }
{ "_id" : 2, "value" : 5 }
{ "_id" : 3, "value" : 6 }
{ "_id" : 4, "value" : 10 }
{ "_id" : 5, "value" : 9 }
{ "_id" : 6, "value" : 12 }
{ "_id" : 7, "value" : 5 }
{ "_id" : 8, "value" : 16 }
{ "_id" : 9, "value" : 10 }
{ "_id" : 10, "value" : 13 }
...

Using Sorting

I’ve outlined in a previous post how beneficial using a sort can be for MR. It is a very poorly understood feature. In this case, processing the input unsorted means that the MR engine will get the values in random order and will not have the opportunity to reduce at all in RAM. Instead it will have to write all the documents back to disk in a temporary collection, to later read them back in order and reduce. Let’s see if using a sort helps:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout", 
sort: {dim0: 1} })
{
        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        },
        "ok" : 1
}

That’s a big help indeed! We’re down to 192s which is already a 6x improvement. The number of reduces is about the same, but now they are done in RAM before the results are written to disk.

Published at DZone with permission of Antoine Girbal, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)