Different ways of sorting datasets - Hadoop

The ability to sort data is at the heart of MapReduce. Even if your application isn’t concerned with sorting per se, it may be able to use the sorting stage that MapReduce provides to organize its data. In this section, we will examine different ways of sorting datasets and how you can control the sort order in MapReduce.

Preparation

We are going to sort the weather dataset by temperature. Storing temperatures as Text objects doesn’t work for sorting purposes, since signed integers don’t sort lexicographically.† Instead, we are going to store the data using sequence files whose IntWritable keys represent the temperature (and sort correctly), and whose Text values are the lines of data.

The MapReduce job in Example is a map-only job that also filters the input to remove records that don’t have a valid temperature reading. Each map creates a single block-compressed sequence file as output. It is invoked with the following command:

Example . A MapReduce program for transforming the weather data into SequenceFile format


Partial Sort

In “The Default MapReduce Job” we saw that, by default, MapReduce will sort input records by their keys. Example is a variation for sorting sequence files with IntWritable keys.

. A MapReduce program for sorting a SequenceFile with IntWritable keys using the default HashPartitioner

Controlling Sort Order

The sort order for keys is controlled by a RawComparator, which is found as follows:

  1. If the property mapred.output.key.comparator.class is set, an instance of that class is used. (The setOutputKeyComparatorClass() method on JobConf is a convenient way to set this property.)
  2. Otherwise, keys must be a subclass of WritableComparable, and the registered comparator for the key class is used.
  3. If there is no registered comparator, then a RawComparator is used that deserializes the byte streams being compared into objects and delegates to the WritableCompar able’s compareTo() method.

These rules reinforce why it’s important to register optimized versions of RawCompara tors for your own custom Writable classes (which is covered in “Implementing a Raw- Comparator for speed” ), and also that it’s straightforward to override the sort order by setting your own comparator (we do this in “Secondary Sort” ).

Suppose we run this program using 30 reducers:

This command produces 30 output files, each of which is sorted. However, there is no easy way to combine the files (by concatenation, for example, in the case of plain-text files) to produce a globally sorted file. For many applications, this doesn’t matter. For example, having a partially sorted set of files is fine if you want to do lookups.

An application: Partitioned MapFile lookups

To perform lookups by key, for instance, having multiple files works well. If we change the output format to be a MapFileOutputFormat, as shown in Example, then the output is 30 map files, which we can perform lookups against.

A MapReduce program for sorting a SequenceFile and producing MapFiles as output

MapFileOutputFormat provides a pair of convenience static methods for performing lookups against MapReduce output; their use is shown in Example .

Example Retrieve the first entry with a given key from a collection of MapFiles

The getReaders() method opens a MapFile.Reader for each of the output files created by the MapReduce job. The getEntry() method then uses the partitioner to choose the reader for the key and finds the value for that key by calling Reader’s get() method. If getEntry() returns null, it means no matching key was found. Otherwise, it returns the value, which we translate into a station ID and year.

To see this in action, let’s find the first entry for a temperature of –10°C (remember that temperatures are stored as integers representing tenths of a degree, which is why we ask for a temperature of –100):

We can also use the readers directly, in order to get all the records for a given key. The array of readers that is returned is ordered by partition, so that the reader for a given key may be found using the same partitioner that was used in the MapReduce job:

Then once we have the reader, we get the first key using MapFile’s get() method, then repeatedly call next() to retrieve the next key and value, until the key changes. A program to do this is shown in Example

Example. Retrieve all entries with a given key from a collection of MapFiles

And here is a sample run to retrieve all readings of –10°C and count them:

Total Sort

How can you produce a globally sorted file using Hadoop? The naive answer is to use a single partition.§ But this is incredibly inefficient for large files, since one machine has to process all of the output, so you are throwing away the benefits of the parallel architecture that MapReduce provides.

Instead, it is possible to produce a set of sorted files that, if concatenated, would form a globally sorted file. The secret to doing this is to use a partitioner that respects the total order of the output. For example, if we had four partitions, we could put keys for temperatures less than –10°C in the first partition, those between –10°C and 0°C in the second, those between 0°C and 10°C in the third, and those over 10°C in the fourth.

Although this approach works, you have to choose your partition sizes carefully to ensure that they are fairly even so that job times aren’t dominated by a single reducer.

For the partitioning scheme just described, the relative sizes of the partitions are as follows:

temperature range

These partitions are not very even. To construct more even partitions, we need to have a better understanding of the temperature distribution for the whole dataset. It’s fairly easy to write a MapReduce job to count the number of records that fall into a collection of temperature buckets. For example, Figure shows the distribution for buckets of size 1°C, where each point on the plot corresponds to one bucket.

temperature distribution of whether dataset

While we could use this information to construct a very even set of partitions, the fact that we needed to run a job that used the entire dataset to construct them is not ideal.

It’s possible to get a fairly even set of partitions, by sampling the key space. The idea behind sampling is that you look at a small subset of the keys to approximate the key distribution, which is then used to construct partitions. Luckily, we don’t have to write the code to do this ourselves, as Hadoop comes with a selection of samplers.

The InputSampler class defines a nested Sampler interface whose implementations return a sample of keys given an InputFormat and JobConf

This interface is not usually called directly by clients. Instead, the writePartition File() static method on InputSampler is used, which creates a sequence file to store the keys that define the partitions:

The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.

puts it all together. A MapReduce program for sorting a SequenceFile with IntWritable keys using the TotalOrderPartitioner to globally sort the data

We use a RandomSampler, which chooses keys with a uniform probability here, 0.1. There are also parameters for the maximum number of samples to take and the maximum number of splits to sample (here, 10,000 and 10, respectively; these settings are the defaults when InputSampler is run as an application), and the sampler stops when the first of these limits is met. Samplers run on the client, making it important to limitthe number of splits that are downloaded, so the sampler runs quickly. In practice, the time taken to run the sampler is a small fraction of the overall job time.

The partition file that InputSampler writes is called _partitions, which we have set to be in the input directory (it will not be picked up as an input file since it starts with an underscore). To share the partition file with the tasks running on the cluster, we add it to the distributed cache (see “Distributed Cache” ).

On one run, the sampler chose –5.6°C, 13.9°C, and 22.0°C as partition boundaries (for four partitions), which translates into more even partition sizes than the earlier choice of partitions:

temperature range

Your input data determines the best sampler for you to use. For example, SplitSam pler, which samples only the first n records in a split, is not so good for sorted data‖ because it doesn’t select keys from throughout the split.

On the other hand, IntervalSampler chooses keys at regular intervals through the split and makes a better choice for sorted data. RandomSampler is a good general-purpose sampler. If none of these suits your application (and remember that the point of sampling is to produce partitions that are approximately equal in size), you can write your own implementation of the Sampler interface.

One of the nice properties of InputSampler and TotalOrderPartitioner is that you are free to choose the number of partitions. This choice is normally driven by the number of reducer slots in your cluster (choose a number slightly fewer than the total, to allow for failures). However, TotalOrderPartitioner will work only if the partition boundaries are distinct: one problem with choosing a high number is that you may getcollisions if you have a small key space.

Here’s how we run it:

The program produces 30 output partitions, each of which is internally sorted; in addition, for these partitions, all the keys in partition i are less than the keys in partition i + 1.

Secondary Sort

The MapReduce framework sorts the records by key before they reach the reducers. For any particular key, however, the values are not sorted. The order that the values appear is not even stable from one run to the next, since they come from different map tasks, which may finish at different times from run to run. Generally speaking, most MapReduce programs are written so as not to depend on the order that the valuesappear to the reduce function. However, it is possible to impose an order on the values by sorting and grouping the keys in a particular way.

To illustrate the idea, consider the MapReduce program for calculating the maximum temperature for each year. If we arranged for the values (temperatures) to be sorted in descending order, we wouldn’t have to iterate through them to find the maximum we could take the first for each year and ignore the rest. (This approach isn’t the most efficient way to solve this particular problem, but it illustrates how secondary sort works in general.)

To achieve this, we change our keys to be composite: a combination of year and temperature. We want the sort order for keys to be by year (ascending) and then by temperature (descending):

If all we did was change the key, then this wouldn’t help since now records for the same year would not (in general) go to the same reducer since they have different keys. For example, (1900, 35°C) and (1900, 34°C) could go to different reducers. By setting a partitioner to partition by the year part of the key, we can guarantee that records for the same year go to the same reducer. This still isn’t enough to achieve our goal,however. A partitioner ensures only that one reducer receives all the records for a year; it doesn’t change the fact that the reducer groups by key within the partition:

Secondary Sort

The final piece of the puzzle is the setting to control the grouping. If we group values in the reducer by the year part of the key, then we will see all the records for the same year in one reduce group. And since they are sorted by temperature in descending order, the first is the maximum temperature:

partition group

To summarize, there is a recipe here to get the effect of sorting by value:

  • Make the key a composite of the natural key and the natural value.
  • The key comparator should order by the composite key, that is, the natural key and natural value.
  • The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping.

Java code

Putting this all together results in the code in Example This program uses the plaintext input again.

Example. Application to find the maximum temperature by sorting temperatures in the key

In the mapper, we create a key representing the year and temperature, using an IntPair Writable implementation. (IntPair is like the TextPair class we developed in “Implementing a Custom Writable” ) We don’t need to carry any information in the value, since we can get the first (maximum) temperature in the reducer from the key, so we use a NullWritable. The reducer emits the first key, which due to the secondary sorting, is an IntPair for the year and its maximum temperature. IntPair’s toString() method creates a tab-separated string, so the output is a set of tab-separated year-temperature pairs.

Many applications need to access all the sorted values, not just the first value as we have provided here. To do this, you need to populate the value fields since in the reducer you can retrieve only the first key. Thisnecessitates some unavoidable duplication of information between key and value.

We set the partitioner to partition by the first field of the key (the year), using a custom partitioner. To sort keys by year (ascending) and temperature (descending), we use a custom key comparator that extracts the fields and performs the appropriate comparisons.

Similarly, to group keys by year, we set a custom comparator, using setOutput ValueGroupingComparator(), to extract the first field of the key for comparison. Running this program gives the maximum temperatures for each year:

Streaming

To do a secondary sort in Streaming, we can take advantage of a couple of library classes that Hadoop provides. Here’s the driver that we can use to do a secondary sort:

Our map function (Example ) emits records with year and temperature fields. We want to treat the combination of both of these fields as the key, so we set stream.num.map.output.key.fields to 2. This means that values will be empty, just like in the Java case.

Example Map function for secondary sort in Python

However, we don’t want to partition by the entire key, so we use the KeyFieldBased Partitioner partitioner, which allows us to partition by a part of the key. The specification mapred.text.key.partitioner.options configures the partitioner. The value -k1,1 instructs the partitioner to use only the first field of the key, where fields are assumed to be separated by a string defined by the map.output.key.field.separator property (a tab character by default).

Next, we want a comparator that sorts the year field in ascending order and the temperature field in descending order, so that the reduce function can simply return the first record in each group. Hadoop provides KeyFieldBasedComparator, which is ideal for this purpose. The comparison order is defined by a specification that is like the one used for GNU sort. It is set using the mapred.text.key.comparator.options property.

The value -k1n -k2nr used in this example means “sort by the first field in numerical order, then by the second field in reverse numerical order.” Like its partitioner cousin, KeyFieldBasedPartitioner, it uses the separator defined by the map.out put.key.field.separator to split a key into fields.

In the Java version, we had to set the grouping comparator; however, in Streaming, groups are not demarcated in any way, so in the reduce function we have to detect the group boundaries ourselves by looking for when the year changes (Example).

Example . Reducer function for secondary sort in Python

When we run the streaming program, we get the same output as the Java version.

Finally, note that KeyFieldBasedPartitioner and KeyFieldBasedComparator are not confined to use in Streaming programs they are applicable to Java MapReduce programs, too.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics