MapReduce: Simplified Data Processing on Large Clusters Jeffrey Dean and Sanjay Ghemawat, Google, Inc. - When was this paper published? 2004 - it has stood the test of time. Google scholar: 4028 citations (as of Jan 2011). - Background: Google engineers used to implement special-purpose programs that processed large amounts of raw data - Computation had to be distributed across many machines - Resulting details: how to parallelize the computation, distribute data, handle failures. - Details obscured the code, made it complex - MapReduce is a simple abstraction to hide the messy details - Let's look at some example applications to familiarize ourselves with MapReduce. - grep returns lines of (presumed text) files matching a given search string. Distributed grep ================ String search; map(String key, String value) // key: document name // value: document contents for each line l in value: if l contains search: EmitIntermediate("-", l); reduce(String key, Iterator values) // key: a line // values: dummy placeholder Emit(values); Count of URL access frequency ============================= map(String key, String value) // key: access log name // value: access log contents for each URL u in value: EmitIntermediate(u, "1"); reduce(String key, Iterator values) // key: a URL // values: a list of counts int result = 0; foreach v in values: result += ParseInt(v); Emit(AsString(result)); Inverted index ============== map(String key, String value) // key: document name // value: document contents int docid = hash(key); foreach word w in value: EmitIntermediate(w, AsString(docid)); reduce(String key, Iterator values) // key: word // values: a list of document ids sort(values); Emit(AsString(list(values))); - Implementation - bisection bandwidth: divide the network into two parts. This is the bandwidth between the parts. - Suppose 2^6 nodes Topology Bisection bandwidth Bus 1 Ring 2 Tree 1 Fully conn. 2^10 - Large clusters of commodity PCs connected with Mbit/Gbit switched Ethernet running the spanning tree protocol. - Machine failures are common - Partition input data into M *splits*, each split gets mapped to a task on a different machine in parallel. - User optionally specifies M. - Input and output data resides in Google File System (GFS), a distributed filesystem - GFS divides each file into 64 MB blocks, stores typically three copies of each block on different machines. - Partition intermediate key space into *R* pieces (e.g. hash(key) mod R). This generates R reduce tasks that get mapped to different machines in parallel as well. - User specifies R and partitioning function. 1. Split file into M pieces; fork many copies of program on different machines 2. *Master* machine assigns either map or reduce task to any idle *worker* machine 3. Map worker: Read split, parse (k, v) pairs, pass each pair to Map, buffer results in memory. 4. Map worker periodically writes buffered pairs to local disk, partitioned into R regions. Map worker updates master with region locations and sizes, master remembers these. 5. Reduce worker sends a remote procedure call to map worker to read pairs from map workers' disks. Sort pairs by intermediate key. 6. Reduce worker, for each intermediate key, passes all pairs to the Reduce function 7. Master wakes up user program when all reduce tasks finish; output is R files, one for each reduce task. - Master remembers (idle, in-progress, completed) state for each map or reduce task. - Fault tolerance - Worker failure: master pings each worker periodically; if no response: - Reset any in-progress/completed map tasks to idle (other workers take them). Why? Their output is stored locally hence inaccessible. - Notify all workers executing reduce tasks of the new map task machine assignment (so they read data from new place). - Reduce task failure? Just set to idle if not completed (their output is stored in a global file system). - Master failure: abort the computation, client can try again with a new master. Rare occurence since there is only one master. - Claim: Distributed implementation produces same output as a non-faulting local execution with deterministic operators - Why? Atomic commits of each map/reduce task. Atomic: Either it all happens once, or no effects are observed. - Map tasks: worker sends completion message to master with names of R temporary files; master accepts only one. - Reduce task: Atomic rename of temporary output file to final output file; file system guarantees just one execution. - Non-deterministic operators: different reduce tasks' output can reflect different executions, because if a map task fails *while a reduce task is executing*, the map task gets reassigned and re-executed on a *different* machine. - Locality: Input data is in GFS, master takes location information of input data into account, tries to assign map tasks to machines that already have a local copy of data, or machines on same LAN. - Task granularity: How to choose M, R? - M, R >> # machines to improve dynamic load balancing and distribute completed map tasks in the case of failure - Master needs to make O(M+R) scheduling decisions (one for each M or R task) - Master needs to keep O(M*R) state in memory (names of R intermediate files for each of M map tasks) - Users constrain R because each reduce task generates separate output file. - "Straggler": Machine that takes unusually long time to complete one of the last few M/R tasks in a computation - Why does this happen? Bad disk: errors slow down. CPU load high. Configuration bug disables CPU cache. - Strategy: When you are close to completion, schedule *backup* executions of remaining in-progress tasks. Task completes when either primary or backup completes. Atomicity properties above guarantee correctness. Refinements =========== - Partitioning function - Suppose intermediate keys are URLs (URL access frequency example above), and we want all entries for a single host in the same output file. - Then use the following partitioning function: hash(Hostname(URLkey)) mod R - Ordering guarantees: within a given partition, intermediate k/v pairs are processed in increasing key order. So can generate sorted output files. - Combiner function (for map tasks where reduce is commutative and associative) - Consider word count example above. Lots of records sent over the network to reduce. - *Combiner* function executes on map machines to merge them; like reduce but output handled locally on map machine. - Skipping bad records: Somtimes map/reduce functions crash deterministically on certain data. Sometimes not feasible to fix the bug (others' code). So store the sequence number of the data given to the map/reduce function and send it to master on crash. If master sees more than one failure on a particular record, it indicates that that record should be skipped when it re-issues execution of the corresponding map/reduce task. Performance =========== - Examine grep over 1 TB, and sort 1 TB data. - What's the high-level goal of the performance evaluation? To show how long a large MapReduce computation takes, and why. No head-to-head performance comparison here, however. - Setup: 1,800 machine cluster, gig ethernet link - Grep: 1 TB data, figure 2 in paper. Conclusion: takes 150 seconds. - Sort [slide] - Map function extracts sort key from text line and emits key, text line as key, value - Reduce is the identity. - 2-way replication in GFS - M = 15,000 (64 MB input data chunks), R = 4,000 - Input rate: All map tasks finish before 200 seconds. Less peak input rate than grep, since sort expends I/O B/W on writing to disk, unlike grep. - Shuffle (rate at which data flows over net from M -> R tasks) - Begins as soon as first map task completes - First hump: first batch of 1,700 tasks - Second hump (300 s): some R tasks from first batch complete, and new R tasks start moving data - Output (rate at which data written to final output files by R task) - Delay between end of first shuffle and start of write due to sorting data - Notes - Input rate > shuffle, output rate b/c local reads - Shuffle rate > output rate b/c output phase writes two copies - Effect of backup disks on sort [slide] - This slide shows sort with backup tasks DISABLED - Done after 1283 seconds (44% more than normal) - At 960 seconds, waiting for just *five* slow reduce tasks - Machine failures [slide] - Kill 200 out of 1,700 worker tasks at about t=250 seconds - Negative input *and* shuffle rate since kill both M and R tasks, and some input and shuffle work needs to be redone - Finish in 933 seconds, compared to 850 seconds normally Experience ========== - Many uses: machine learning, Google News, Google Product Search, extraction of data for reports, extraction of geographical locations from large corpus of web pages for localized search - Processed three Petabytes of data in August 2004! - Rewrote the indexing system that produces data structures for Google web search - Processes documents retrieved by crawler (>20 TB), stored in GFS - Simpler, smaller indexing code Related work ============ - River: More general programming model where processes communicate by sending data over distributed queues - Bigger tasks. By restricting programming model, MapReduce partitions into large number of fine-grained tasks - MapReduce dynamically schedules so that faster workers process more tasks