File-Based Data Structures - Hadoop

For some applications, you need a specialized data structure to hold your data. For doing MapReduce-based processing, putting each blob of binary data into its own file doesn’t scale, so Hadoop developed a number of higher-level containers for these situations.

SequenceFile

Imagine a logfile, where each log record is a new line of text. If you want to log binary types, plain text isn’t a suitable format. Hadoop’s SequenceFile class fits the bill in this situation, providing a persistent data structure for binary key-value pairs. To use it as a logfile format, you would choose a key, such as timestamp represented by a LongWritable, and the value is a Writable that represents the quantity being logged.

SequenceFiles also work well as containers for smaller files. HDFS and MapReduce are optimized for large files, so packing files into a SequenceFile makes storing and processing the smaller files more efficient. (“Processing a whole file as a record” on page 206 contains a program to pack files into a SequenceFile.)

Writing a SequenceFile

To create a SequenceFile, use one of its createWriter() static methods, which returns a SequenceFile.Writer instance. There are several overloaded versions, but they all require you to specify a stream to write to (either a FSDataOutputStream or a FileSys tem and Path pairing), a Configuration object, and the key and value types. Optional arguments include the compression type and codec, a Progressable callback to be informed of write progress, and a Metadata instance to be stored in the SequenceFile header.

The keys and values stored in a SequenceFile do not necessarily need to be Writable. Any types that can be serialized and deserialized by a Serialization may be used.

Once you have a SequenceFile.Writer, you then write key-value pairs, using the append() method. Then when you’ve finished, you call the close() method (Sequence File.Writer implements java.io.Closeable).

shows a short program to write some key-value pairs to a Sequence File, using the API just described.

The keys in the sequence file are integers counting down from 100 to 1, represented as IntWritable objects. The values are Text objects. Before each record is appended to the SequenceFile.Writer, we call the getLength() method to discover the current position in the file. (We will use this information about record boundaries in the next section when we read the file nonsequentially.) We write the position out to the console, along with the key and value pairs. The result of running it is shown here:

Reading a SequenceFile

Reading sequence files from beginning to end is a matter of creating an instance of SequenceFile.Reader and iterating over records by repeatedly invoking one of the next() methods. Which one you use depends on the serialization framework you are using. If you are using Writable types, you can use the next() method that takes a keyand a value argument, and reads the next key and value in the stream into these variables:

public boolean next(Writable key, Writable val)

The return value is true if a key-value pair was read and false if the end of the file has been reached.

For other, nonWritable serialization frameworks (such as Apache Thrift), you should use these two methods:

In this case, you need to make sure that the serialization you want to use has been setin the io.serializations property; If the next() method returns a non-null object, a key-value pair was read from the stream, and the value can be retrieved using the getCurrentValue() method. Otherwise, if next() returns null, the end of the file has been reached.

how to read a sequence file that has Writable keys and values. Note how the types are discovered from the Sequence File.Reader via calls to getKeyClass() and getValueClass(), then ReflectionUtils isused to create an instance for the key and an instance for the value. By using this technique, the program can be used with any sequence file that has Writable keys and values.

Reading a SequenceFile

Another feature of the program is that it displays the position of the sync points in the sequence file. A sync point is a point in the stream that can be used to resynchronize with a record boundary if the reader is “lost”—for example, after seeking to an arbitrary position in the stream. Sync points are recorded by SequenceFile.Writer, which inserts a special entry to mark the sync point every few records as a sequence file is being written. Such entries are small enough to incur only a modest storage overhead—less than 1%. Sync points always align with record boundaries.

Running the program it shows the sync points in the sequence file as asterisks. The first one occurs at position 2021 (the second one occurs at position 4075, but is not shown in the output):

There are two ways to seek to a given position in a sequence file. The first is the seek() method, which positions the reader at the given point in the file. For example, seeking to a record boundary works as expected:

But if the position in the file is not at a record boundary, the reader fails when the next() method is called:

The second way to find a record boundary makes use of sync points. The sync(long position) method on SequenceFile.Reader positions the reader at the next sync point after position. (If there are no sync points in the file after this position, then the reader will be positioned at the end of the file.) Thus, we can call sync() with any position in the stream a nonrecord boundary, for example and the reader will reestablish itself at the next sync point so reading can continue:

SequenceFile.Writer has a method called sync() for inserting a sync point at the current position in the stream. This is not to be confused with the identically named but otherwise unrelated sync() method defined by the Syncable interface for synchronizing buffers to the underlying device.

Sync points come into their own when using sequence files as input to MapReduce, since they permit the file to be split, so different portions of it can be processed independently by separate map tasks. See “SequenceFileInputFormat” .

Displaying a SequenceFile with the command-line interface

The hadoop fs command has a -text option to display sequence files in textual form. It looks at a file’s magic number so that it can attempt to detect the type of the file and appropriately convert it to text. It can recognize gzipped files and sequence files; otherwise, it assumes the input is plain text.

For sequence files, this command is really useful only if the keys and values have a meaningful string representation (as defined by the toString() method). Also, if you have your own key or value classes, then you will need to make sure they are on Hadoop’s classpath.

Running it on the sequence file we created in the previous section gives the following output:

Sorting and merging SequenceFiles

The most powerful way of sorting (and merging) one or more sequence files is to use MapReduce. MapReduce is inherently parallel and will let you specify the number of reducers to use, which determines the number of output partitions. For example, by specifying one reducer, you get a single output file. We can use the sort example that comes with Hadoop by specifying that the input and output are sequence files, and by setting the key and value types:

As an alternative to using MapReduce for sort/merge, there is a SequenceFile.Sorter class that has a number of sort() and merge() methods. These functions predate Map- Reduce and are lower-level functions than MapReduce (for example, to get parallelism, you need to partition your data manually), so in general MapReduce is the preferred approach to sort and merge sequence files.

The SequenceFile format

A sequence file consists of a header followed by one or more records . The first three bytes of a sequence file are the bytes SEQ, which acts a magic number, followed by a single byte representing the version number. The header contains other fields including the names of the key and value classes, compression details, userdefined metadata, and the sync marker. Recall that the sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file. They are designed to incur less than a 1% storage overhead, so they don’t necessarily appear between every pair of records (such is the case for short records).

The SequenceFile format

The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression.

If no compression is enabled (the default), then each record is made up of the record length (in bytes), the key length, the key, and then the value. The length fields are written as four-byte integers adhering to the contract of the writeInt() method of java.io.DataOutput. Keys and values are serialized using the Serialization defined for the class being written to the sequence file.

The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed.

Block compression compresses multiple records at once; it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. (See Figure ) Records are added to a block until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize property: the default is 1 million bytes. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values.

MapFile

A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.

MapFile

Writing a MapFile

Writing a MapFile is similar to writing a SequenceFile: you create an instance of MapFile.Writer, then call the append() method to add entries in order. (Attempting to add entries out of order will result in an IOException.) Keys must be instances of WritableComparable, and values must be Writable contrast this to SequenceFile, which can use any serialization framework for its entries.

Writing a MapFile

Let’s use this program to build a MapFile:

% hadoop MapFileWriteDemo numbers.map

If we look at the MapFile, we see it’s actually a directory containing two files called data and index:

Both files are SequenceFiles. The data file contains all of the entries, in order:

The index file contains a fraction of the keys, and contains a mapping from the key to that key’s offset in the data file:

As we can see from the output, by default only every 128th key is included in the index, although you can change this value either by setting the io.map.index.interval property or by calling the setIndexInterval() method on the MapFile.Writer instance. A reason to increase the index interval would be to decrease the amount of memory that the MapFile needs to store the index. Conversely, you might decrease the interval to improve the time for random selection (since fewer records need to be skipped on average) at the expense of memory usage.

Since the index is only a partial index of keys, MapFile is not able to provide methods to enumerate, or even count, all the keys it contains. The only way to perform these operations is to read the whole file.

Reading a MapFile

Iterating through the entries in order in a MapFile is similar to the procedure for a SequenceFile: you create a MapFile.Reader, then call the next() method until it returns false, signifying that no entry was read because the end of the file was reached:

A random access lookup can be performed by calling the get() method:

The return value is used to determine if an entry was found in the MapFile; if it’s null, then no value exists for the given key. If key was found, then the value for that key is read into val, as well as being returned from the method call.It might be helpful to understand how this is implemented. Here is a snippet of code that retrieves an entry for the MapFile we created in the previous section:

For this operation, the MapFile.Reader reads the index file into memory (this is cached so that subsequent random access calls will use the same in-memory index). The reader then performs a binary search on the in-memory index to find the key in the index that is less than or equal to the search key, 496. In this example, the index key found is 385, with value 18030, which is the offset in the data file. Next the reader seeks to this offset in the data file and reads entries until the key is greater than or equal to the search key, 496. In this case, a match is found and the value is read from the data file. Overall, a lookup takes a single disk seek and a scan through up to 128 entries on disk. For a random-access read, this is actually very efficient.

The getClosest() method is like get() except it returns the “closest” match is specified key, rather than returning null on no match. More precisely, if the MapFile contains the specified key, then that is the entry returned; otherwise, the key in the MapFile that is immediately after (or before, according to a boolean argument) the specified key is returned.

A very large MapFile’s index can take up a lot of memory. Rather than reindex to chang the index interval, it is possible to load only a fraction of the index keys into memory when reading the MapFile by setting the io.map.index.skip property. This property is normally 0, which means no index keys are skipped; a value of 1 means skip one key for every key in the index (so every other key ends up in the index), 2 means skip twokeys for every key in the index (so one third of the keys end up in the index), and soon. Larger skip values save memory but at the expense of lookup time, since more entries have to be scanned on disk, on average.

Converting a SequenceFile to a MapFile

One way of looking at a MapFile is as an indexed and sorted SequenceFile. So it’s quite natural to want to be able to convert a SequenceFile into a MapFile. We covered how to sort a SequenceFile in “Sorting and merging SequenceFiles” ,so here we look at how to create an index for a SequenceFile. The program in below hinges around the static utility method fix() on MapFile, which re-creates the index for a MapFile.

Re-creating the index for a MapFile:

The fix() method is usually used for re-creating corrupted indexes, but since it creates a new index from scratch, it’s exactly what we need here. The recipe is as follows:

  1. Sort the sequence file numbers.seq into a new directory called number.map that willbecome the MapFile (if the sequence file is already sorted, then you can skip thisstep. Instead, copy it to a file number.map/data, then go to step 3):
  2. % hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1
    -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat
    -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat
    -outKey org.apache.hadoop.io.IntWritable
    -outValue org.apache.hadoop.io.Text
    numbers.seq numbers.map
  3. Rename the MapReduce output to be the data file:
  4. % hadoop fs -mv numbers.map/part-00000 numbers.map/data
  5. Create the index file:

Created MapFile numbers.map with 100 entries The MapFile numbers.map now exists and can be used.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics