MapReduce Types - Hadoop

The map and reduce functions in Hadoop MapReduce have the following general form:

In general, the map input key and value types (K1 and V1) are different from the map output types (K2 and V2). However, the reduce input must have the same types as the map output, although the reduce output types may be different again (K3 and V3). The Java interfaces mirror this form:

Recall that the OutputCollector is purely for emitting key-value pairs (and is hence parameterized with their types), while the Reporter is for updating counters and status. (In the new MapReduce API in release 0.20.0 and later, these two functions are combined in a single context object.)

If a combine function is used, then it is the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function:

Often the combine and reduce functions are the same, in which case, K3 is the same as K2, and V3 is the same as V2. The partition function operates on the intermediate key and value types (K2 and V2),and returns the partition index. In practice, the partition is determined solely by the key (the value is ignored):

The only configuration that we set is an input path and an output path. We run it over a subset of our weather data with the following:

Each line is an integer followed by a tab character, followed by the original weather data record. Admittedly, it’s not a very useful program, but understanding how it produces its output does provide some insight into the defaults that Hadoop uses when running MapReduce jobs. shows a program that has exactly the same effect as MinimalMapReduce, but explicitly sets the job settings to their defaults.

A minimal MapReduce driver, with the defaults explicitly set

We’ve simplified the first few lines of the run() method, by extracting the logic for printing usage and setting the input and output paths into a helper method. Almost all MapReduce drivers take these two arguments (input and output), so reducing the boilerplate code here is a good thing. Here are the relevant methods in the JobBuilder class for reference:

Going back to MinimalMapReduceWithDefaults in above example, although there are many other default job settings, the ones highlighted are those most central to running a job.

Let’s go through them in turn. The default input format is TextInputFormat, which produces keys of type LongWrita ble (the offset of the beginning of the line in the file) and values of type Text (the lineof text). This explains where the integers in the final output come from: they are the line offsets.

Despite appearances, the setNumMapTasks() call does not necessarily set the number of map tasks to one, in fact. It is a hint, and the actual number of map tasks depends on the size of the input, and the file’s block size (if the file is in HDFS). This is discussed further in “FileInputFormat input splits”

The default mapper is IdentityMapper, which writes the input key and value unchanged to the output:

IdentityMapper is a generic type, which allows it to work with any key or value types, with the restriction that the map input and output keys are of the same type, and the map input and output values are of the same type. In this case, the map output key is LongWritable and the map output value is Text.

Map tasks are run by MapRunner, the default implementation of MapRunnable that calls the Mapper’s map() method sequentially with each record. The default partitioner is HashPartitioner, which hashes a record’s key to determine which partition the record belongs in. Each partition is processed by a reduce task, so the number of partitions is equal to the number of reduce tasks for the job:

The key’s hash code is turned into a nonnegative integer by bitwise ANDing it with the largest integer value. It is then reduced modulo the number of partitions to find the index of the partition that the record belongs in.

By default, there is a single reducer, and therefore a single partition, so the action of the partitioner is irrelevant in this case since everything goes into one partition. However, it is important to understand the behavior of HashPartitioner when you have more than one reduce task. Assuming the key’s hash function is a good one, the records will be evenly allocated across reduce tasks, with all records sharing the same key beingprocessed by the same reduce task.

Choosing the Number of Reducers

The single reducer default is something of a gotcha for new users to Hadoop. Almost all real-world jobs should set this to a larger number; otherwise, the job will be very slow since all the intermediate data flows through a single reduce task. (Note that when running under the local job runner, only zero or one reducers are supported.)

The optimal number of reducers is related to the total number of available reducer slots in your cluster. The total number of slots is found by multiplying the number of nodes in the cluster and the number of slots per node (which is determined by the value of the mapred.tasktracker.reduce.tasks.maximum property, described in “Environment Settings” ).

One common setting is to have slightly fewer reducers than total slots, which gives one wave of reduce tasks (and tolerates a few failures, without extending job execution time). If your reduce tasks are very big, then it makes sense to have a larger number of reducers (resulting in two waves, for example) so that the tasks are more fine-grained, and failure doesn’t affect job execution time significantly.

The default reducer is IdentityReducer, again a generic type, which simply writes all its input to its output:

For this job, the output key is LongWritable, and the output value is Text. In fact, all the keys for this MapReduce program are LongWritable, and all the values are Text, since these are the input keys and values, and the map and reduce functions are both identity functions which by definition preserve type. Most MapReduce programs, however, don’t use the same key or value types throughout, so you need to configure the job to declare the types you are using, as described in the previous section.

Records are sorted by the MapReduce system before being presented to the reducer. In this case, the keys are sorted numerically, which has the effect of interleaving the lines from the input files into one combined output file.

The default output format is TextOutputFormat, which writes out records, one per line, by converting keys and values to strings and separating them with a tab character. This is why the output is tab-separated: it is a feature of TextOutputFormat.

The default Streaming job

In Streaming, the default job is similar, but not identical, to the Java equivalent. The minimal form is:

Notice that you have to supply a mapper: the default identity mapper will not work.

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd Protection Status

Hadoop Topics