Output Formats - Hadoop

Hadoop has output data formats that correspond to the input formats covered in the previous section. The OutputFormat class hierarchy appears in Below Figure.

OutputFormat class hierarchy appears

Text Output

The default output format, TextOutputFormat, writes records as lines of text. Its keys and values may be of any type, since TextOutputFormat turns them to strings by calling toString() on them. Each key-value pair is separated by a tab character, although that may be changed using the mapred.textoutputformat.separator property. The counterpart to TextOutputFormat for reading in this case is KeyValueTextInputFormat, since itbreaks lines into key-value pairs based on a configurable separator (see “KeyValue- TextInputFormat” ).

You can suppress the key or the value (or both, making this output format equivalent to NullOutputFormat, which emits nothing) from the output using a NullWritable type.

This also causes no separator to be written, which makes the output suitable for reading in using TextInputFormat.

Binary Output

SequenceFileOutputFormat

As the name indicates, SequenceFileOutputFormat writes sequence files for its output. This is a good choice of output if it forms the input to a further MapReduce job, sinceit is compact and is readily compressed. Compression is controlled via the static methods on SequenceFileOutputFormat, as described in “Using Compression in Map- Reduce” on page 84. For an example of how to use SequenceFileOutputFormat, see“Sorting” .

SequenceFileAsBinaryOutputFormat

SequenceFileAsBinaryOutputFormat is the counterpart to SequenceFileAsBinaryInput Format, and it writes keys and values in raw binary format into a SequenceFile container.

MapFileOutputFormat

MapFileOutputFormat writes MapFiles as output. The keys in a MapFile must be added in order, so you need to ensure that your reducers emit keys in sorted order.

The reduce input keys are guaranteed to be sorted, but the output keys are under the control of the reduce function, and there is nothing in the general MapReduce contract that states that the reduce output keys haveto be ordered in any way. The extra constraint of sorted reduce output keys is just needed for MapFileOutputFormat.

Multiple Outputs

FileOutputFormat and its subclasses generate a set of files in the output directory. There is one file per reducer, and files are named by the partition number: part-00000, part-00001, etc. There is sometimes a need to have more control over the naming of the files or to produce multiple files per reducer. MapReduce comes with two libraries to help you do this: MultipleOutputFormat and MultipleOutputs.

An example: Partitioning data

Consider the problem of partitioning the weather dataset by weather station. We would like to run a job whose output is a file per station, with each file containing all the records for that station.

One way of doing this is to have a reducer for each weather station. To arrange this, we need to do two things. First, write a partitioner that puts records from the same weather station into the same partition. Second, set the number of reducers on the job to be the number of weather stations. The partitioner would look like this:

The getPartition(String) method, whose implementation is not shown, turns the station ID into a partition index. To do this, it needs a list of all the station IDs and then just returns the index of the station ID in the list.There are two drawbacks to this approach. The first is that since the number of partitions needs to be known before the job is run, so does the number of weather stations.

Although the NCDC provides metadata about its stations, there is no guarantee that the IDs encountered in the data match those in the metadata. A station that appears in the metadata but not in the data wastes a reducer slot. Worse, a station that appears in the data but not in the metadata doesn’t get a reducer slot it has to be thrown away.

One way of mitigating this problem would be to write a job to extract the unique station IDs, but it’s a shame that we need an extra job to do this.

The second drawback is more subtle. It is generally a bad idea to allow the number of partitions to be rigidly fixed by the application, since it can lead to small or unevensized partitions. Having many reducers doing a small amount of work isn’t an efficient way of organizing a job: it’s much better to get reducers to do more work and have fewer of them, as the overhead in running a task is then reduced. Uneven-sized partitionscan be difficult to avoid, too. Different weather stations will have gathered a widely varying amount of data: compare a station that opened one year ago to one that has been gathering data for one century. If a few reduce tasks take significantly longer than the others, they will dominate the job execution time and cause it to be longer than it needs to be.

There are two special cases when it does make sense to allow the application to set the number of partitions (or equivalently, the number of reducers):

This is a vacuous case: there are no partitions, as the application needs to run only map tasks.

One reducer

It can be convenient to run small jobs to combine the output of previous jobs into a single file. This should only be attempted when the amount of data is small enough to be processed comfortably by one reducer.

It is much better to let the cluster drive the number of partitions for a job the idea being that the more cluster reduce slots are available the faster the job can complete.

This is why the default HashPartitioner works so well, as it works with any number of partitions and ensures each partition has a good mix of keys leading to more even-sized partitions.

If we go back to using HashPartitioner, each partition will contain multiple stations, so to create a file per station, we need to arrange for each reducer to write multiple files, which is where MultipleOutputFormat comes in.

MultipleOutputFormat

MultipleOutputFormat allows you to write data to multiple files whose names are derived from the output keys and values. MultipleOutputFormat is an abstract class with two concrete subclasses, MultipleTextOutputFormat and MultipleSequenceFileOutput Format, which are the multiple file equivalents of TextOutputFormat and SequenceFileOutputFormat. MultipleOutputFormat provides a few protected methods that subclasses can override to control the output filename. In Example we create a subclass of MultipleTextOutputFormat to override the generateFileNameForKey Value() method to return the station ID, which we extracted from the record value.

Partitioning whole dataset into files named by the station ID using MultipleOutputFormat

StationMapper pulls the station ID from the record and uses it as the key. This causes records from the same station to go into the same partition. StationReducer replaces the key with a NullWritable so that when the final output is written using StationName MultipleTextOutputFormat (which like TextOutputFormat drops NullWritable keys), it consists solely of weather records (and not the station ID key).

The overall effect is to place all the records for one station in a file named by the station ID. Here are a few lines of output after running the program over a subset of the total dataset:

The filename returned by generateFileNameForKeyValue() is actually a path that is interpreted relative to the output directory. It’s possible to create subdirectories of arbitrary depth. For example, the following modification partitions the data by station and year so that each year’s data is contained in a directory named by the station ID:

MultipleOutputFormat has more features that are not discussed here, such as the ability to copy the input directory structure and file naming for a map-only job. Please consult the Java documentation for details.

MultipleOutputs

There’s a second library in Hadoop for generating multiple outputs, provided by the MultipleOutputs class. Unlike MultipleOutputFormat, MultipleOutputs can emit different types for each output. On the other hand, there is less control over the naming of outputs. The program in Example shows how to use MultipleOutputs to partition the dataset by station.

Example . Partitioning whole dataset into files named by the station ID using MultipleOutputs

The MultipleOutputs class is used to generate additional outputs to the usual output. Outputs are given names and may be written to a single file (called single named output) or to multiple files (called multinamed output). In this case, we want multiple files, one for each station, so we use a multi named output, which we initialize in the driver by calling the addMultiNamedOutput() method of MultipleOutputs to specify the nameof the output (here "station"), the output format, and the output types. In addition, we set the regular output format to be NullOutputFormat in order to suppress the usual output.

In the reducer, where we generate the output, we construct an instance of MultipleOut puts in the configure() method and assign it to an instance variable. We use the MultipleOutputs instance in the reduce() method to retrieve an OutputCollector for the multinamed output. The getCollector() method takes the name of the output ("station" again) as well as a string identifying the part within the multinamed output.

Here we use the station identifier, with the “-” separator in the key removed, since only alphanumeric characters are allowed by MultipleOutputs.

The overall effect is to produce output files with the naming scheme station_<station identifier>-r-<part_number>. The r appears in the name because the output is produced by the reducer, and the part number is appended to be sure that there are no collisions resulting from different partitions (reducers) writing output for the same station. Since we partition by station, it cannot happen in this case (but it can in the general case).In one run, the first few output files were named as follows (other columns from the directory listing have been dropped):

What’s the Difference Between MultipleOutputFormat and MultipleOutputs?

It’s unfortunate (although not necessarily unusual in an open source project) to have two libraries that do almost the same thing, since it is confusing for users. To help you choose which to use, here is a brief comparison:

multiple outputs

So in summary, MultipleOutputs is more fully featured, but MultipleOutputFormat has more control over the output directory structure and file naming.

In the new MapReduce API, the situation is improved, since there is only MultipleOut puts, which supports all the features of the two multiple output classes in the old API.

Lazy Output

FileOutputFormat subclasses will create output (part-nnnnn) files, even if they are empty. Some applications prefer that empty files not be created, which is where LazyOutput Format helps.† It is a wrapper output format that ensures that the output file is created only when the first record is emitted for a given partition. To use it, call its setOutput FormatClass() method with the JobConf and the underlying output format.Streaming and Pipes support a -lazyOutput option to enable LazyOutputFormat.

Database Output

The output formats for writing to relational databases and to HBase are mentioned in “Database Input (and Output)” .


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics