Input Formats - Hadoop

Hadoop can process many different types of data formats, from flat text files to databases. In this section, we explore the different formats available.

Input Splits and Records

As we saw in MapReduce Chapter an input split is a chunk of the input that is processed by a single map. Each map processes a single split. Each split is divided into records, and the map processes each record a key-value pair in turn. Splits and records are logical: there is nothing that requires them to be tied to files, for example, although in their most common incarnations, they are. In a database context, a split might correspondto a range of rows from a table and a record to a row in that range (this is precisely what DBInputFormat does, an input format for reading data from a relational database).

Input splits are represented by the Java interface, InputSplit (which, like all of the classes mentioned in this section, is in the org.apache.hadoop.mapred package):

† But see the new MapReduce classes in org.apache.hadoop.mapreduce, described in “The new Java MapReduce API”.

An InputSplit has a length in bytes and a set of storage locations, which are just hostname strings. Notice that a split doesn’t contain the input data; it is just a reference to the data. The storage locations are used by the MapReduce system to place map tasks as close to the split’s data as possible, and the size is used to order the splits so that the largest get processed first, in an attempt to minimize the job runtime (this is an instanceof a greedy approximation algorithm).

As a MapReduce application writer, you don’t need to deal with InputSplits directly, as they are created by an InputFormat. An InputFormat is responsible for creating the input splits and dividing them into records.

Before we see some concrete examples of InputFormat, let’s briefly examine how it is used in MapReduce. Here’s the interface:

The JobClient calls the getSplits() method, passing the desired number of map tasks as the numSplits argument. This number is treated as a hint, as InputFormat implementations are free to return a different number of splits to the number specified in numSplits. Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the task trackers.

On a tasktracker, the map task passes the split to the getRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function. A code snippet (based on the code in MapRunner) illustrates the idea:

Here the RecordReader’s next() method is called repeatedly to populate the key and value objects for the mapper. When the reader gets to the end of the stream, the next() method returns false, and the map task completes.

This code snippet makes it clear that the same key and value objects are used on each invocation of the map() method only their contents are changed (by the reader’s next() method). This can be a surprise to users, who might expect keys and values to be immutable. This causes problems when a reference to a key or value object is retained outside the map() method, as its value can change without warning. If you need todo this, make a copy of the object you want to hold on to. For example, for a Text object, you can use its copy constructor: new Text(value).

The situation is similar with reducers. In this case, the value objects in the reducer’s iterator are reused, so you need to copy any that you need to retain between calls to the iterator.

Finally, note that MapRunner is only one way of running mappers. MultithreadedMapRun ner is another implementation of the MapRunnable interface that runs mappers concurrently in a configurable number of threads (set by mapred.map.multithreadedrun ner.threads). For most data processing tasks, it confers no advantage over MapRunner.

However, for mappers that spend a long time processing each record, because they contact external servers, for example, it allows multiple mappers to run in one JVM with little contention. See “Fetcher: A multithreaded MapRunner in action” for an example of an application that uses MultithreadedMapRunner.

FileInputFormat

FileInputFormat is the base class for all implementations of InputFormat that use files as their data source . It provides two things: a place to define which files are included as the input to a job, and an implementation for generating splits for the input files. The job of dividing splits into records is performed by subclasses.

FileInputFormat input paths

The input to a job is specified as a collection of paths, which offers great flexibility in constraining the input to a job. FileInputFormat offers four static convenience methods for setting a JobConf’s input paths:

The addInputPath() and addInputPaths() methods add a path or paths to the list ofinputs. You can call these methods repeatedly to build the list of paths. The setInputPaths() methods set the entire list of paths in one go (replacing any paths set on the JobConf in previous calls).

FileInputFormat input paths

A path may represent a file, a directory, or, by using a glob, a collection of files and directories. A path representing a directory includes all the files in the directory as input to the job. See “File patterns” on page 60 for more on using globs.

The contents of a directory specified as an input path are not processed recursively. In fact, the directory should only contain files: if the directory contains a subdirectory, it will be interpreted as a file, which willcause an error. The way to handle this case is to use a file glob or a filter to select only the files in the directory based on a name pattern.

The add and set methods allow files to be specified by inclusion only. To exclude certain files from the input, you can set a filter using the setInputPathFilter() method on FileInputFormat:

Filters are discussed in more detail in “PathFilter” .

Even if you don’t set a filter, FileInputFormat uses a default filter that excludes hidden files (those whose names begin with a dot or an underscore). If you set a filter by calling setInputPathFilter(), it acts in addition to the default filter. In other words, only nonhidden files that are accepted by your filter get through.

Paths and filters can be set through configuration properties, too), which can be handy for Streaming and Pipes. Setting paths is done with the -input option for both Streaming and Pipes interfaces, so setting paths directly is not usually needed.

input path and filter properties

FileInputFormat input splits

Given a set of files, how does FileInputFormat turn them into splits? FileInputFormat splits only large files. Here “large” means larger than an HDFS block. The split size is normally the size of an HDFS block, which is appropriate for most applications; however, it is possible to control this value by setting various Hadoop properties, as shown in below Table .

File Input Format input splits

a This property is not present in the old MapReduce API (with the exception of CombineFileInputFormat). Instead, it is calculated indirectly as the size of the total input for the job, divided by the guide number of map tasks specified by mapred.map.tasks (or the setNumMapTasks() method on JobConf). Because mapred.map.tasks defaults to 1, this makes the maximum split size the size of the input.

The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. (For example, sequence files insert sync entries every so often in the stream, so the minimum split size has to be large enough to ensure that every split has a sync point to allow the reader to resynchronize with a record boundary.)

Applications may impose a minimum split size: by setting this to a value larger than the block size, they can force splits to be larger than a block. There is no good reason for doing this when using HDFS, since doing so will increase the number of blocks that are not local to a map task.

The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block.

The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat):

max(minimumSize, min(maximumSize, blockSize)) by default:

so the split size is blockSize. Various settings for these parameters and how they affect the final split size are illustrated in BelowTable.

how to control the split size

Small files and CombineFileInputFormat

Hadoop works better with a small number of large files than a large number of small files. One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file. If the file is very small (“small” means significantly smaller than an HDFS block) and there are a lot of them, then each map task will process very little input, and there will be a lot of them (one per file), each of which imposesextra bookkeeping overhead. Compare a 1 GB file broken into sixteen 64 MB blocks, and 10,000 or so 100 KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file and 16 map tasks.

The situation is alleviated somewhat by CombineFileInputFormat, which was designed to work well with small files. Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process. Crucially, CombineFileInputFormat takes node and rack locality into account when deciding which blocks to place in the same split, so it does not compromise thespeed at which it can process the input in a typical MapReduce job.

Of course, if possible, it is still a good idea to avoid the many small files case since MapReduce works best when it can operate at the transfer rate of the disks in the cluster, and processing many small files increases the number of seeks that are needed to run a job. Also, storing large numbers of small files in HDFS is wasteful of the namenode’s memory. One technique for avoiding the many small files case is to merge small filesinto larger files by using a SequenceFile: the keys can act as filenames (or a constant such as NullWritable, if not needed) and the values as file contents. But if you already have a large number of small files in HDFS, then CombineFileInput Format is worth trying.

CombineFileInputFormat isn’t just good for small files it can bring benefits when processing large files, too. Essentially, CombineFileInputFor mat decouples the amount of data that a mapper consumes from theblock size of the files in HDFS.

If your mappers can process each block in a matter of seconds, then you could use CombineFileInputFormat with the maximum split size set to a small multiple of the number of blocks (by setting the mapred.max.split.size property in bytes) so that each mapper processes more than one block. In return, the overall processing time falls, since proportionally fewer mappers run, which reduces the overhead in taskbookkeeping and startup time associated with a large number of shortlived mappers.

Since CombineFileInputFormat is an abstract class without any concrete classes (unlike FileInputFormat), you need to do a bit more work to use it. (Hopefully, common implementations will be added to the library over time.) For example, to have the CombineFileInputFormat equivalent of TextInputFormat, you would create a concrete subclass of CombineFileInputFormat and implement the getRecordReader() method.

Preventing splitting

Some applications don’t want files to be split so that a single mapper can process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only if one map processes the whole file

There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The second is to subclass the concrete subclass of FileInputFormat that you want to use, to override the isSplitable() method§ to return false. For example, here’s a nonsplittableTextInputFormat:

File information in the mapper

A mapper processing a file input split can find information about the split by reading some special properties from its job configuration object, which may be obtained by implementing configure() in your Mapper implementation to get access to the JobConf object.Below Table lists the properties available. These are in addition to the ones available to all mappers and reducers, listed in “The Task Execution Environment”.

File split properties

File split properties

In the next section, you shall see how to use this when we need to access the split’s filename.

This is how the mapper in SortValidator.RecordStatsChecker is implemented. § In the method name isSplitable(), “splitable” has a single “t.” It is usually spelled “splittable,” which is the spelling I have used in this book.

Processing a whole file as a record

A related requirement that sometimes crops up is for mappers to have access to the full contents of a file. Not splitting the file gets you part of the way there, but you also need to have a RecordReader that delivers the file contents as the value of the record. The listing for WholeFileInputFormat in shows a way of doing this.

An InputFormat for reading a whole file as a record

WholeFileInputFormat defines a format where the keys are not used, represented by NullWritable, and the values are the file contents, represented by BytesWritable instances.

It defines two methods. First, the format is careful to specify that input files should never be split, by overriding isSplitable() to return false. Second, we implement getRecordReader() to return a custom implementation of RecordReader, which appears in .

The RecordReader used by WholeFileInputFormat for reading a whole file as a record:

WholeFileRecordReader is responsible for taking a FileSplit and converting it into a single record, with a null key and a value containing the bytes of the file. Because there is only a single record, holeFileRecordReader has either processed it or not, so it maintains a boolean called processed. If, when the next() method is called, the file has not been processed, then we open the file, create a byte array whose length is the length ofthe file, and use the Hadoop IOUtils class to slurp the file into the byte array. Then we set the array on the BytesWritable instance that was passed into the next() method, and return true to signal that a record has been read.

The other methods are straightforward bookkeeping methods for creating the correct key and value types, getting the position and progress of the reader, and a close() method, which is invoked by the MapReduce framework when the reader is done with.

To demonstrate how WholeFileInputFormat can be used, consider a MapReduce job for packaging small files into sequence files, where the key is the original filename, and the value is the content of the file. The listing is inBelow Example.

A MapReduce program for packaging a collection of small files as a single SequenceFile

Since the input format is a WholeFileInputFormat, the mapper has to find only the filename for the input file split. It does this by retrieving the map.input.file property from the JobConf, which is set to the split’s filename by the MapReduce framework, but only for splits that are FileSplit instances (this includes most subclasses of FileInputFormat). The reducer is the IdentityReducer, and the output format is a SequenceFileOutputFormat.

Here’s a run on a few small files. We’ve chosen to use two reducers, so we get two output sequence files:

The input files were named a, b, c, d, e, and f, and each contained 10 characters of the corresponding letter (so, for example, a contained 10 “a” characters), except e, which was empty. We can see this in the textual rendering of the sequence files, which prints the filename followed by the hex representation of the file.

There’s at least one way we could improve this program. As mentioned earlier, having one mapper per file is inefficient, so subclassing CombineFileInputFormat instead of FileInputFormat would be a better approach. Also, for a related technique of packing files into a Hadoop Archive, rather than a sequence file, see the section “Hadoop Archives”

Text Input

Hadoop excels at processing unstructured text. In this section, we discuss the different InputFormats that Hadoop provides to process text.

TextInputFormat

TextInputFormat is the default InputFormat. Each record is a line of input. The key, a LongWritable, is the byte offset within the file of the beginning of the line. The value is the contents of the line, excluding any line terminators (newline, carriage return), and is packaged as a Text object. So a file containing the following text:

is divided into one split of four records. The records are interpreted as the following key-value pairs:

Clearly, the keys are not line numbers. This would be impossible to implement in general, in that a file is broken into splits, at byte, not line, boundaries. Splits are processed independently. Line numbers are really a sequential notion: you have to keep a count of lines as you consume them, so knowing the line number within a split would be possible, but not within the file.

However, the offset within the file of each line is known by each split independently of the other splits, since each split knows the size of the preceding splits and just adds this on to the offsets within the split to produce a global file offset. The offset is usually sufficient for applications that need a unique identifier for each line. Combined with the file’s name, it is unique within the filesystem. Of course, if all the lines are a fixed width, then calculating the line number is simply a matter of dividing the offset by the width.

The Relationship Between Input Splits and HDFS Blocks

The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program lines are not missed or broken, for example but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

Figure shows an example. A single file is broken into lines, and the line boundaries do not correspond with the HDFS block boundaries. Splits honor logical record boundaries, in this case lines, so we see that the first split contains line 5, even though it spans the first and second block. The second split starts at line 6.

The Relationship Between Input Splits and HDFS Blocks

KeyValueTextInputFormat

TextInputFormat’s keys, being simply the offset within the file, are not normally very useful. It is common for each line in a file to be a key-value pair, separated by a delimiter such as a tab character. For example, this is the output produced by TextOutputFor mat, Hadoop’s default OutputFormat. To interpret such files correctly, KeyValueTextIn putFormat is appropriate.

You can specify the separator via the key.value.separator.in.input.line property. It is a tab character by default. Consider the following input file, where → represents a (horizontal) tab character:

NLineInputFormat

With TextInputFormat and KeyValueTextInputFormat, each mapper receives a variable number of lines of input. The number depends on the size of the split and the length of the lines. If you want your mappers to receive a fixed number of lines of input, then NLineInputFormat is the InputFormat to use. Like TextInputFormat, the keys are the byte offsets within the file and the values are the lines themselves.

N refers to the number of lines of input that each mapper receives. With N set to one (the default), each mapper receives exactly one line of input. The mapred.line.input.format.linespermap property controls the value of N.

By way of example, consider these four lines again:

The keys and values are the same as TextInputFormat produces. What is different is the way the splits are constructed.

Usually, having a map task for a small number of lines of input is inefficient (due to the overhead in task setup), but there are applications that take a small amount of input data and run an extensive (that is, CPU-intensive) computation for it, then emit their output. Simulations are a good example. By creating an input file that specifies input parameters, one per line, you can perform a parameter sweep: run a set of simulationsin parallel to find how a model varies as the parameter changes.

If you have long-running simulations, you may fall afoul of task timeouts. When a task doesn’t report progress for more than 10 minutes, then the tasktracker assumes it has failed and aborts the process (see “Task Failure” )

The best way to guard against this is to report progress periodically, by writing a status message, or incrementing a counter, for example. See “What Constitutes Progress in MapReduce?” .

Another example is using Hadoop to bootstrap data loading from multiple data sources, such as databases. You create a “seed” input file that lists the data sources, one per line. Then each mapper is allocated a single data source, and it loads the data from that source into HDFS. The job doesn’t need the reduce phase, so the number of reducers should be set to zero (by calling setNumReduceTasks() on Job). Furthermore,MapReduce jobs can be run to process the data loaded into HDFS. See Appendix C for an example.

XML

Most XML parsers operate on whole XML documents, so if a large XML document is made up of multiple input splits, then it is a challenge to parse these individually. Of course, you can process the entire XML document in one mapper (if it is not too large) using the technique in “Processing a whole file as a record” .

Large XML documents that are composed of a series of “records” (XML document fragments) can be broken into these records using simple string or regular-expression matching to find start and end tags of records. This alleviates the problem when the document is split by the framework, since the next start tag of a record is easy to find by simply scanning from the start of the split, just like TextInputFormat finds newline boundaries.

Hadoop comes with a class for this purpose called StreamXmlRecordReader (which is in the org.apache.hadoop.streaming package, although it can be used outside of Streaming).

You can use it by setting your input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecor dReader. The reader is configured by setting job configuration properties to tell it the patterns for the start and end tags (see the class documentation for details).

To take an example, Wikipedia provides dumps of its content in XML form, which are appropriate for processing in parallel using MapReduce using this approach. The data is contained in one large XML wrapper document, which contains a series of elements, such as page elements that contain a page’s content and associated metadata. Using StreamXmlRecordReader, the page elements can be interpreted as records for processing by a mapper.

Binary Input

Hadoop MapReduce is not just restricted to processing textual data it has support for binary formats, too. SequenceFileInputFormat

Hadoop’s sequence file format stores sequences of binary key-value pairs. They are well suited as a format for MapReduce data since they are splittable (they have sync points so that readers can synchronize with record boundaries from an arbitrary point in the file, such as the start of a split), they support compression as a part of the format, and they can store arbitrary types using a variety of serialization frameworks. (These topics are covered in “SequenceFile”)

To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat. The keys and values are determined by the sequence file, and you need to make sure that your map input types correspond. For example, if your sequence file has IntWritable keys and Text values, like the one created in Hadoop I/O Chapter , then the map signature would be Mapper<IntWritable, Text, K, V>, where K and V are the types of the map’s output keys and values.

Although its name doesn’t give it away, SequenceFileInputFormat can read MapFiles as well as sequence files. If it finds a directory where it was expecting a sequence file, SequenceFileInputFormat assumes that itis reading a MapFile and uses its data file. This is why there is no MapFileInputFormat class.

See Mahout’s XmlInputFormat (available from http://mahout.apache.org/) for an improved XML input format.

SequenceFileAsTextInputFormat

SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat that converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() on the keys and values. This format makes sequence files suitable input for Streaming.

SequenceFileAsBinaryInputFormat

SequenceFileAsBinaryInputFormat is a variant of SequenceFileInputFormat that retrieves the sequence file’s keys and values as opaque binary objects. They are encapsulated as BytesWritable objects, and the application is free to interpret the underlying byte array as it pleases. Combined with SequenceFile.Reader’s appendRaw() method, this provides a way to use any binary data types with MapReduce (packaged as a sequence file), although plugging into Hadoop’s serialization mechanism is normally a cleaner alternative (see “Serialization Frameworks” ).

Multiple Inputs

Although the input to a MapReduce job may consist of multiple input files (constructed by a combination of file globs, filters, and plain paths), all of the input is interpreted by a single InputFormat and a single Mapper. What often happens, however, is that over time, the data format evolves, so you have to write your mapper to cope with all of your legacy formats. Or, you have data sources that provide the same type of data but indifferent formats. This arises in the case of performing joins of different datasets; see “Reduce-Side Joins” For instance, one might be tab-separated plain text, the other a binary sequence file. Even if they are in the same format, they may have different representations and, therefore, need to be parsed differently.

These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, if we had weather data from the UK Met Office# that we wanted to combine with the NCDC data for our maximum temperature analysis, then we might set up the input as follows:

This code replaces the usual calls to FileInputFormat.addInputPath() and conf.setMap perClass(). Both Met Office and NCDC data is text-based, so we use TextInputFor mat for each. But the line format of the two data sources is different, so we use two different mappers. The MaxTemperatureMapper reads NCDC input data and extracts the year and temperature fields. The MetOfficeMaxTemperatureMapper reads Met Office input data and extracts the year and temperature fields. The important thing is that the map outputs have the same types, since the reducers (which are all of the same type) see the aggregated map outputs and are not aware of the different mappers used to produce them.

The MultipleInputs class has an overloaded version of addInputPath() that doesn’t take a mapper:

This is useful when you only have one mapper (set using the JobConf’s setMapper Class() method) but multiple input formats.

Database Input (and Output)

DBInputFormat is an input format for reading data from a relational database, using JDBC. Because it doesn’t have any sharding capabilities, you need to be careful not to overwhelm the database you are reading from by running too many mappers. For this reason, it is best used for loading relatively small datasets, perhaps for joining with larger datasets from HDFS, using MultipleInputs. The corresponding output format isDBOutputFormat, which is useful for dumping job outputs (of modest size) into a database.

For an alternative way of moving data between relational databases and HDFS, consider using Sqoop, which is described in Sqoop Chapter.

HBase’s TableInputFormat is designed to allow a MapReduce program to operate on data stored in an HBase table. TableOutputFormat is for writing MapReduce outputs into an HBase table.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics