Big Data/Analytics Zone is brought to you in partnership with:

I am a software architect working in service hosting area. I am interested and specialized in SaaS, Cloud computing and Parallel processing. Ricky is a DZone MVB and is not an employee of DZone and has posted 87 posts at DZone. You can read more from them at their website. View Full User Profile

How Hadoop Map/Reduce works

12.16.2008
| 63145 views |
  • submit to reddit

In my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

 

Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear.

This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.

User defined Map/Reduce functions

 

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.

map(input_record) {
...
emit(k1, v1)
...
emit(k2, v2)
...
}
reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
aggregate = merge(values.next)
}
collect(key, aggregate)
}

 

The Map/Reduce DAG is organized in this way.

 

 

A parallel algorithm is usually structure as multiple rounds of Map/Reduce

 

 

HDFS

 

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.

 

 

There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed. DataNode reports all its chunks to the NameNode at bootup. Each chunk has a version number which will be increased for all update. Therefore, the NameNode know if any of the chunks of a DataNode is stale (e.g. when the DataNode crash for some period of time). Those stale chunks will be garbage collected at a later time.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success. All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file. In case of the NameNode crash, a new NameNode will take over after restoring the state from the last checkpoint file and replay the operation log.

 

MapRed

 

The job execution starts when the client program submit to the JobTracker a job configuration, which specifies the map, combine and reduce function, as well as the input and output path of data.

 

 

The JobTracker will first determine the number of splits (each split is configurable, ~16-64MB) from the input path, and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers.

Each TaskTracker will start the map phase processing by extracting the input data from the splits. For each record parsed by the “InputFormat”, it invoke the user provided “map” function, which emits a number of key/value pair in the memory buffer. A periodic wakeup process will sort the memory buffer into different reducer node by invoke the “combine” function. The key/value pairs are sorted into one of the R local files (suppose there are R reducer nodes).

When the map task completes (all splits are done), the TaskTracker will notify the JobTracker. When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase.

Each TaskTracker will read the region files remotely. It sorts the key/value pairs and for each key, it invoke the “reduce” function, which collects the key/aggregatedValue into the output file (one per reducer node).

Map/Reduce framework is resilient to crash of any components. The JobTracker keep tracks of the progress of each phases and periodically ping the TaskTracker for their health status. When any of the map phase TaskTracker crashes, the JobTracker will reassign the map task to a different TaskTracker node, which will rerun all the assigned splits. If the reduce phase TaskTracker crashes, the JobTracker will rerun the reduce at a different TaskTracker.

After both phase completes, the JobTracker will unblock the client program.

Published at DZone with permission of Ricky Ho, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Ratna Dinakar T... replied on Thu, 2008/12/18 - 10:30pm

Ricky, Thanks for the article which removed many of confusing concepts. Hope you will publish a article regarding implementation using a small example.

All the best..

Trong Tran replied on Tue, 2009/09/22 - 5:08am

I like this topic

shwitzu ssh replied on Tue, 2009/10/13 - 2:05pm

Hello!

I am new to hadoop. I have 4 databases with different files in each one of them. Say, images in one, video, documents etc. My task is to develop a web service which accepts the keyword from the client and it should come to HDFS and the output/ the file has to go back to the client.

I have the following questions:

1) How should I start with the design?
2)  Should I upload all the files and create Map, Reduce and Driver code and once I run my application will it automatically go the file system and get back the results to me?

Please let me know how I should proceed.

Thanks in anticipation.

Ricky Ho replied on Sat, 2009/11/07 - 6:26pm

Thanks for mrflip's feedback.

 

In fact, take a look at the latest post here at 

http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html

Rgds, Ricky

Charles Wardell replied on Tue, 2012/03/06 - 11:20am

Love this article, you break down Map Reduce in such a simplistic way.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.