We saw how the MapReduce system executes tasks in the context of the overall job at the beginning of the chapter in “Anatomy of a MapReduce Job Run” . In this section, we’ll look at some more controls that MapReduce users have over task execution.

Speculative Execution

The MapReduce model is to break jobs into tasks and run the tasks in parallel to make the overall job execution time smaller than it would otherwise be if the tasks ran sequentially.

This makes job execution time sensitive to slow-running tasks, as it takes only one slow task to make the whole job take significantly longer than it would have done otherwise. When a job consists of hundreds or thousands of tasks, the possibility of a few straggling tasks is very real.

Tasks may be slow for various reasons, including hardware degradation or software mis-configuration, but the causes may be hard to detect since the tasks still complete successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix slow-running tasks; instead, it tries to detect when a task is running slower than expected and launches another, equivalent, task as a backup. This is termed speculative execution of tasks.

Speculative execution is an optimization, not a feature to make jobs run more reliably. If there are bugs that sometimes cause a task to hang or slow down, then relying on speculative execution to avoid these problems is unwise, and won’t work reliably, since the same bugs are likely to affect the speculative task. You should fix the bug so that the task doesn’t hang or slow down.

Speculative execution is turned on by default. It can be enabled or disabled independently for map tasks and reduce tasks, on a cluster-wide basis, or on a per-job basis. The relevant properties are shown inbelow .

Why would you ever want to turn off speculative execution? The goal of speculative execution is reducing job execution time, but this comes at the cost of cluster efficiency.

On a busy cluster, speculative execution can reduce overall throughput, since redundant tasks are being executed in an attempt to bring down the execution time for a single job. For this reason, some cluster administrators prefer to turn it off on the cluster and have users explicitly turn it on for individual jobs. This was especially relevant for older versions of Hadoop, when speculative execution could be overly aggressive inscheduling speculative tasks.

Hadoop runs tasks in their own Java Virtual Machine to isolate them from other running tasks. The overhead of starting a new JVM for each task can take around a second, which for jobs that run for a minute or so is insignificant. However, jobs that have a large number of very short-lived tasks (these are usually map tasks), or that have lengthy initialization, can see performance gains when the JVM is reused for subsequent tasks.

With task JVM reuse enabled, tasks do not run concurrently in a single JVM. The JVM runs tasks sequentially. Tasktrackers can, however, run more than one task at a time, but this is always done in separate JVMs. The properties for controlling the tasktrackers number of map task slots and reduce task slots are discussed in “Memory” .

The property for controlling task JVM reuse is mapred.job.reuse.jvm.num.tasks: it specifies the maximum number of tasks to run for a given job for each JVM launched; the default is 1. Tasks from different jobs are always run in separate JVMs. If the property is set to –1, there is no limit to the number of tasks from the same job that may share a JVM. The method setNumTasksToExecutePerJvm() on JobConf can also be used to configure this property.

Tasks that are CPU-bound may also benefit from task JVM reuse by taking advantage of runtime optimizations applied by the HotSpot JVM. After running for a while, the HotSpot JVM builds up enough information to detect performance-critical sections in the code and dynamically translates the Java byte codes of these hot spots into native machine code. This works well for long-running processes, but JVMs that run for seconds or a few minutes may not gain the full benefit of HotSpot. In these cases, it is worth enabling task JVM reuse.

Another place where a shared JVM is useful is for sharing state between the tasks of a job. By storing reference data in a static field, tasks get rapid access to the shared data.

Large datasets are messy. They often have corrupt records. They often have records that are in a different format. They often have missing fields. In an ideal world, your code would cope gracefully with all of these conditions. In practice, it is often expedient to ignore the offending records. Depending on the analysis being performed, if only a small percentage of records are affected, then skipping them may not significantly affectthe result. However, if a task trips up when it encounters a bad record by throwing a runtime exception then the task fails. Failing tasks are retried (since the failure may be due to hardware failure or some other reason outside the task’s control), but if a task fails four times, then the whole job is marked as failed (see “Task Failure” ). If it is the data that is causing the task to throw an exception, rerunning the task won’t help, since it will fail in exactly the same way each time.

If you are using TextInputFormat (“TextInputFormat” ), then you can set a maximum expected line length to safeguard against corrupted files. Corruption in a file can manifest itself as a very long line, which can cause out of memory errors and then task failure. By setting mapred.linerecordreader.maxlength to a value in bytes that fits in memory (and is comfortably greater than the length of lines in your input data), the record reader will skip the (long) corrupt lines without the task failing.

The best way to handle corrupt records is in your mapper or reducer code. You can detect the bad record and ignore it, or you can abort the job by throwing an exception.

You can also count the total number of bad records in the job using counters to see how widespread the problem is. In rare cases, though, you can’t handle the problem because there is a bug in a thirdpartylibrary that you can’t work around in your mapper or reducer. In these cases, you can use Hadoop’s optional skipping mode for automatically skipping bad records.

When skipping mode is enabled, tasks report the records being processed back to the tasktracker. When the task fails, the tasktracker retries the task, skipping the records that caused the failure. Because of the extra network traffic and bookkeeping to maintain the failed record ranges, skipping mode is turned on for a task only after it has failed twice.

3. Skipping mode is enabled. Task fails, but failed record is stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.

Skipping mode is off by default; you enable it independently for map and reduce tasks using the SkipBadRecords class. It’s important to note that skipping mode can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting occasional bad records (a few per task, say). You may need to increase the maximum number of task attempts (via mapred.map.max.attempts andmapred.reduce.max.attempts) to give skipping mode enough attempts to detect and skip all the bad records in an input split.

Bad records that have been detected by Hadoop are saved as sequence files in the job’s output directory under the _logs/skip subdirectory. These can be inspected for diagnostic purposes after the job has completed (using hadoop fs -text, for example).

Hadoop provides information to a map or reduce task about the environment in which it is running. For example, a map task can discover the name of the file it is processing (see “File information in the mapper” ), and a map or reduce task can find out the attempt number of the task. The properties in below Table can be accessed from the job’s configuration, obtained by providing an implementation of the configure() method for Mapper or Reducer, where the configuration is passed in as an argument.

Streaming environment variables

Hadoop sets job configuration parameters as environment variables for Streaming programs. However, it replaces nonalphanumeric characters with underscores to make sure they are valid names. The following Python expression illustrates how you can retrieve the value of the mapred.job.id property from within a Python Streaming script:

You can also set environment variables for the Streaming processes launched by Map- Reduce by supplying the -cmdenv option to the Streaming launcher program (once for each variable you wish to set). For example, the following sets the MAGIC_PARAMETER environment variable:

The usual way of writing output from map and reduce tasks is by using the OutputCol lector to collect key-value pairs. Some applications need more flexibility than a single key-value pair model, so these applications write output files directly from the map or reduce task to a distributed filesystem, like HDFS. (There are other ways to produce multiple outputs, too, as described in “Multiple Outputs” .)

Care needs to be taken to ensure that multiple instances of the same task don’t try to write to the same file. There are two problems to avoid: if a task failed and was retried, then the old partial output would still be present when the second task ran, and it would have to delete the old file first. Second, with speculative execution enabled, two instances of the same task could try to write to the same file simultaneously.

Hadoop solves this problem for the regular outputs from a task by writing outputs to a temporary directory that is specific to that task attempt. The directory is ${mapred.out put.dir}/_temporary/${mapred.task.id}. On successful completion of the task, the contents of the directory are copied to the job’s output directory (\${mapred.out put.dir}). Thus, if the task fails and is retried, the first attempt’s partial output will justbe cleaned up. A task and another speculative instance of the same task will get separate working directories, and only the first to finish will have the content of its working directory promoted to the output directory the other will be discarded.

The way that a task’s output is committed on completion is implemented by an OutputCommitter, which is associated with the OutputFormat. The OutputCommitter for FileOutputFormat is a FileOutputCommitter,which implements the commit protocol described earlier. The getOut putCommitter() method on OutputFormat may be overridden to return a custom OutputCommitter, in case you want to implement the commitprocess in a different way.

Hadoop provides a mechanism for application writers to use this feature, too. A task may find its working directory by retrieving the value of the mapred.work.output.dir property from its configuration file. Alternatively, a MapReduce program using the Java API may call the getWorkOutputPath() static method on FileOutputFormat to get the Path object representing the working directory. The framework creates the working directory before executing the task, so you don’t need to create it.

To take a simple example, imagine a program for converting image files from one format to another. One way to do this is to have a map-only job, where each map is given a set of images to convert (perhaps using NLineInputFormat; see “NLineInputFormat” ). If a map task writes the converted images into its working directory, then they will be promoted to the output directory when the task successfully finishes.