Side Data Distribution - Hadoop

Side data can be defined as extra read-only data needed by a job to process the main dataset. The challenge is to make side data available to all the map or reduce tasks (which are spread across the cluster) in a convenient and efficient fashion.

In addition to the distribution mechanisms described in this section, it is possible to cache side-data in memory in a static field, so that tasks of the same job that run in succession on the same tasktracker can share the data. “Task JVM Reuse” describes how to enable this feature. If you take this approach, be aware of the amount of memory that you are using, as it might affect the memory needed by the shuffle (see “Shuffle and Sort” ).

Using the Job Configuration

You can set arbitrary key-value pairs in the job configuration using the various setter methods on JobConf (inherited from Configuration). This is very useful if you need to pass a small piece of metadata to your tasks. To retrieve the values in the task, override the configure() method in the Mapper or Reducer and use a getter method on the JobConf object passed in.

Usually, a primitive type is sufficient to encode your metadata, but for arbitrary objects you can either handle the serialization yourself (if you have an existing mechanism for turning objects to strings and back), or you can use Hadoop’s Stringifier class.

DefaultStringifier uses Hadoop’s serialization framework to serialize objects (see “Serialization” ).

You shouldn’t use this mechanism for transferring more than a few kilobytes of data because it can put pressure on the memory usage in the Hadoop daemons, particularly in a system running hundreds of jobs. The job configuration is read by the jobtracker, the tasktracker, and the child JVM, and each time the configuration is read, all of its entries are read into memory, even if they are not used. User properties are not read onthe jobtracker or the tasktracker, so they just waste time and memory.

Distributed Cache

Rather than serializing side data in the job configuration, it is preferable to distribute datasets using Hadoop’s distributed cache mechanism. This provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job.

Usage

For tools that use GenericOptionsParser (this includes many of the programs in this book see “GenericOptionsParser, Tool, and ToolRunner”), you can specify the files to be distributed as a comma-separated list of URIs as the argument to the -files option. Files can be on the local filesystem, on HDFS, or on another Hadoop readable filesystem (such as S3). If no scheme is supplied, then the files are assumed to be local. (This is true even if the default filesystem is not the local filesystem.)

You can also copy archive files (JAR files, ZIP files, tar files, and gzipped tar files) to your tasks, using the -archives option; these are unarchived on the task node. The -libjars option will add JAR files to the classpath of the mapper and reducer tasks.This is useful if you haven’t bundled library JAR files in your job JAR file.

Streaming doesn’t use the distributed cache for copying the streaming scripts across the cluster. You specify a file to be copied using the -file option (note the singular), which should be repeated for each file to be copied. Furthermore, files specified using the -file option must be file paths only, not URIs, so they must be accessible from the local filesystem of the client launching the Streaming job.Streaming also accepts the -files and -archives options for copying files into the distributed cache for use by your Streaming scripts.

Let’s see how to use the distributed cache to share a metadata file for station names. The command we will run is:

This command will copy the local file stations-fixed-width.txt (no scheme is supplied, so the path is automatically interpreted as a local file) to the task nodes, so we can use it to look up station names. The listing for MaxTemperatureByStationNameUsingDistri butedCacheFile appears in Example.

Example . Application to find the maximum temperature by station, showing station names from a lookup table passed as a distributed cache file

The program finds the maximum temperature by weather station, so the mapper (StationTemperatureMapper) simply emits (station ID, temperature) pairs. For the combiner, we reuse MaxTemperatureReducer (from ChaptersMapReduce and developing A MapReduce Application) to pick the maximum temperature for any given group of map outputs on the map side. The reducer (MaxTemperatureReducerWithStationLookup) is different from the combiner, since in addition to finding the maximum temperature, it uses the cache file to look up the station name.

We use the reducer’s configure() method to retrieve the cache file using its original name, relative to the working directory of the task.

You can use the distributed cache for copying files that do not fit in memory. MapFiles are very useful in this regard, since they serve as an on-disk lookup format (see “MapFile” ). Because MapFilesare a collection of files with a defined directory structure, you should put them into an archive format (JAR, ZIP, tar, or gzipped tar) and add them to the cache using the -archives option. Side

Here’s a snippet of the output, showing some maximum temperatures for a few weather stations:

How it works

When you launch a job, Hadoop copies the files specified by the -files and -archives options to the jobtracker’s filesystem (normally HDFS). Then, before a task is run, the tasktracker copies the files from the jobtracker’s filesystem to a local disk the cache so the task can access the files. From the task’s point of view, the files are just there (and it doesn’t care that they came from HDFS).

The tasktracker also maintains a reference count for the number of tasks using each file in the cache. Before the task has run, the file’s reference count is incremented by one; then after the task has run, the count is decreased by one. Only when the count reaches zero it is eligible for deletion, since no tasks are using it. Files are deleted to make room for a new file when the cache exceeds a certain size 10 GB by default. Thecache size may be changed by setting the configuration property local.cache.size, which is measured in bytes.

Although this design doesn’t guarantee that subsequent tasks from the same job running on the same tasktracker will find the file in the cache, it is very likely that they will, since tasks from a job are usually scheduled to run at around the same time, so there isn’t the opportunity for enough other jobs to run and cause the original task’s file to be deleted from the cache.

Files are localized under the ${mapred.local.dir}/taskTracker/archive directory on the tasktrackers. Applications don’t have to know this, however, since the files are symbolically linked from the task’s working directory.

The DistributedCache API

Most applications don’t need to use the DistributedCache API because they can use the distributed cache indirectly via GenericOptionsParser. GenericOptionsParser makes it much more convenient to use the distributed cache: for example, it copies local files into HDFS and then the JobClient informs the DistributedCache of their locations in HDFS using the addCacheFile() and addCacheArchive() methods. The JobClient also gets DistributedCache to create symbolic links when the files are localized, by adding fragment identifiers to the files’ URIs. For example, the file specified by the URI hdfs://namenode/foo/bar#myfile is symlinked as myfile in the task’s working directory.

There’s an example of using this API in Example

On the task node, it is most convenient to access the localized file directly; however, sometimes you may need to get a list of all the available cache files. JobConf has two methods for this purpose: etLocalCacheFiles() and getLocalCacheArchives(), which both return an array of Path objects pointing to local files


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics