Shuffle and Sort - Hadoop

MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort and transfers the map outputs to the reducers as inputs is known as the shuffle. In this section, we look at how the shuffle works, as a basic understanding would be helpful, should you need to optimize a Map- Reduce program. The shuffle is an area of the codebase where refinements and improvements are continually being made, so the following description necessarily conceals many details (and may change over time, this is for version 0.20). In many ways, the shuffle is the heart of MapReduce and is where the “magic” happens.

The Map Side

When the map function starts producing output, it is not simply written to disk. The process is more involved, and takes advantage of buffering writes in memory and doing some presorting for efficiency reasons. Figure shows what happens. Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property.

When the contents of the buffer reaches a certain threshold size (io.sort.spill.per cent, default 0.80, or 80%), a background thread will start to spill the contents to disk.

Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete. Spills are written in round-robin fashion to the directories specified by the mapred.local.dir property, in a job-specific subdirectory.

The term shuffle is actually imprecise, since in some contexts it refers to only the part of the process where map outputs are fetched by reduce tasks. In this section, we take it to mean the whole process from the point where a map produces output to where a reduce consumes input.

Shuffle and Sort

Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the background thread performs an in-memory sort by key, and if there is a combiner function, it is run on the output of the sort.

Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files.

Before the task is finished, the spill files are merged into a single partitioned and sorted output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once; the default is 10.

If a combiner function has been specified, and the number of spills is at least three (the value of the min.num.spills.for.combine property), then the combiner is run before the output file is written. Recall that combiners may be run repeatedly over the input without affecting the final result. The point is that running combiners makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.

It is often a good idea to compress the map output as it is written to disk, since doing so makes it faster to write to disk, saves disk space, and reduces the amount of data to transfer to the reducer. By default, the output is not compressed, but it is easy to enable by setting mapred.compress.map.output to true. The compression library to use is specified by mapred.map.output.compression.codec; see “Compression” for more on compression formats.

The output file’s partitions are made available to the reducers over HTTP. The number of worker threads used to serve the file partitions is controlled by the task tracker.http.threads property this setting is per tasktracker, not per map task slot. The default of 40 may need increasing for large clusters running large jobs.

The Reduce Side

Let’s turn now to the reduce part of the process. The map output file is sitting on the local disk of the tasktracker that ran the map task (note that although map outputs always get written to the local disk of the map tasktracker, reduce outputs may not be), but now it is needed by the tasktracker that is about to run the reduce task for the partition. Furthermore, the reduce task needs the map output for its particular partitionfrom several map tasks across the cluster. The map tasks may finish at different times, so the reduce task starts copying their outputs as soon as each completes. This is known as the copy phase of the reduce task. The reduce task has a small number of copier threads so that it can fetch map outputs in parallel. The default is five threads, but this number can be changed by setting the mapred.reduce.parallel.copies property.

How do reducers know which tasktrackers to fetch map output from?

As map tasks complete successfully, they notify their parent tasktracker of the status update, which in turn notifies the jobtracker. These notifications are transmitted over the heartbeat communication mechanismdescribed earlier. Therefore, for a given job, the jobtracker knows the mapping between map outputs and tasktrackers. A thread in the reducer periodically asks the jobtracker for map output locations until it hasretrieved them all.

Tasktrackers do not delete map outputs from disk as soon as the first reducer has retrieved them, as the reducer may fail. Instead, they wait until they are told to delete them by the jobtracker, which is after thejob has completed.

The map outputs are copied to the reduce tasktracker’s memory if they are small enough (the buffer’s size is controlled by mapred.job.shuffle.input.buffer.percent, which specifies the proportion of the heap to use for this purpose); otherwise, they are copied to disk. When the in-memory buffer reaches a threshold size (controlled by mapred.job.shuffle.merge.percent), or reaches a threshold number of map outputs
(mapred.inmem.merge.threshold), it is merged and spilled to disk.

As the copies accumulate on disk, a background thread merges them into larger, sorted files. This saves some time merging later on. Note that any map outputs that were compressed (by the map task) have to be decompressed in memory in order to perform a merge on them.

When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase, as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering. This is done in rounds. For example, if there were 50 map outputs, and the merge factor was 10 (the default, controlled by the io.sort.factor property, just like in the map’s merge), then there would be 5 rounds. Each round would merge 10 files into one, so at the end there would be five intermediate files.

Rather than have a final round that merges these five files into a single sorted file, the merge saves a trip to disk by directly feeding the reduce function in what is the last phase: the reduce phase. This final merge can come from a mixture of in-memory and on-disk segments.

The number of files merged in each round is actually more subtle than this example suggests. The goal is to merge the minimum number of files to get to the merge factor for the final round. So if there were 40files, the merge would not merge 10 files in each of the four rounds to get 4 files. Instead, the first round would merge only 4 files, and the subsequent three rounds would merge the full 10 files. The 4 mergedfiles, and the 6 (as yet unmerged) files make a total of 10 files for the final round.

Note that this does not change the number of rounds, it’s just an optimization to minimize the amount of data that is written to disk, since the final round always merges directly into the reduce.

During the reduce phase, the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output filesystem, typically HDFS. In the case of HDFS, since the tasktracker node is also running a datanode, the first block replica will be written to the local disk.

Configuration Tuning

We are now in a better position to understand how to tune the shuffle to improve MapReduce performance. The relevant settings, which can be used on a per-job basis (except where noted), are summarized in Tables along with the defaults, which are good for general-purpose jobs.

The general principle is to give the shuffle as much memory as possible. However, there is a trade-off, in that you need to make sure that your map and reduce functions get enough memory to operate. This is why it is best to write your map and reduce functions to use as little memory as possible—certainly they should not use an unbounded amount of memory (by avoiding accumulating values in a map, for example).

The amount of memory given to the JVMs in which the map and reduce tasks run is set by the mapred.child.java.opts property. You should try to make this as large as possible for the amount of memory on your task nodes; the discussion in “Memory” goes through the constraints to consider.

On the map side, the best performance can be obtained by avoiding multiple spills to disk; one is optimal. If you can estimate the size of your map outputs, then you can set the io.sort.* properties appropriately to minimize the number of spills. In particular,you should increase io.sort.mb if you can. There is a MapReduce counter (“Spilled records”; see “Counters” ) that counts the total number of records thatwere spilled to disk over the course of a job, which can be useful for tuning. Note that the counter includes both map and reduce side spills.

On the reduce side, the best performance is obtained when the intermediate data can reside entirely in memory. By default, this does not happen, since for the general case all the memory is reserved for the reduce function. But if your reduce function has light memory requirements, then setting mapred.inmem.merge.threshold to 0 and mapred.job.reduce.input.buffer.percent to 1.0 (or a lower value; see Table) maybring a performance boost.

More generally, Hadoop’s uses a buffer size of 4 KB by default, which is low, so you should increase this across the cluster (by setting io.file.buffer.size, see also “Other Hadoop Properties” ).

In April 2008, Hadoop won the general-purpose terabyte sort benchmark (described in “TeraByte Sort on Apache Hadoop” ), and one of the optimizations used was this one of keeping the intermediate data in memory on the reduce side.

Table: Map -side Tuning properties

Map -side Tuning properties

Table: Reduce- side Tuning properties.

Reduce- side Tuning properties.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics