# MapReduce Workflows - Hadoop

So far in this chapter, you have seen the mechanics of writing a program using Map- Reduce. We haven’t yet considered how to turn a data processing problem into the MapReduce model.

The data processing you have seen so far in this book is to solve a fairly simple problem (finding the maximum recorded temperature for given years). When the processing gets more complex, this complexity is generally manifested by having more MapReduce jobs, rather than having more complex map and reduce functions. In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.

For more complex problems, it is worth considering a higher-level language than Map- Reduce, such as Pig, Hive, or Cascading. One immediate benefit is that it frees you up from having to do the translation into MapReduce jobs, allowing you to concentrate on the analysis you are performing.

Finally, the book Data-Intensive Text Processing with MapReduce by Jimmy Lin and Chris Dyer (Morgan & Claypool Publishers, 2010, http://mapreduce.me/) is a great resource for learning more about MapReduce algorithm design, and is highly recommended.

Decomposing a Problem into MapReduce Jobs

Let’s look at an example of a more complex problem that we want to translate into a MapReduce workflow.

Imagine that we want to find the mean maximum recorded temperature for every day of the year and every weather station. In concrete terms, to calculate the mean maximum daily temperature recorded by station 029070-99999, say, on January 1, we take the mean of the maximum daily temperatures for this station for January 1, 1901; January 1, 1902; and so on up to January 1, 2000.

How can we compute this using MapReduce? The computation decomposes most naturally into two stages:

1. Compute the maximum daily temperature for every station-date pair.
2. The MapReduce program in this case is a variant of the maximum temperature program, except that the keys in this case are a composite station-date pair, rather than just the year.
3. Compute the mean of the maximum daily temperatures for every station-daymonthkey.

The mapper takes the output from the previous job (station-date, maximum temperature) records and projects it into (station-day-month, maximum temperature) records by dropping the year component. The reduce function then takes the mean of the maximum temperatures for each station-day-month key.

The output from first stage looks like this for the station we are interested in (the mean_max_daily_temp.sh script in the examples provides an implementation in Hadoop Streaming):

...

The first two fields form the key, and the final column is the maximum temperature from all the readings for the given station and date. The second stage averages these daily maxima over years to yield:

which is interpreted as saying the mean maximum daily temperature on January 1 for station 029070-99999 over the century is −6.8°C.

It’s possible to do this computation in one MapReduce stage, but it takes more work on the part of the programmer.‖

The arguments for having more (but simpler) MapReduce stages are that doing so leads to more composable and more maintainable mappers and reducers. The case studies in case studies cover a wide range of real-world problems that were solved using Map- Reduce, and in each case, the data processing task is implemented using two or more MapReduce jobs. The details in that chapter are invaluable for getting a better idea of how to decompose a processing problem into a MapReduce workflow.

‖ It’s an interesting exercise to do this. Hint: use “Secondary Sort”

It’s possible to make map and reduce functions even more composable than we have done. A mapper commonly performs input format parsing, projection (selecting the relevant fields), and filtering (removing records that are not of interest). In the mappers you have seen so far, we have implemented all of these functions in a single mapper.However, there is a case for splitting these into distinct mappers and chaining them into a single mapper using the ChainMapper library class that comes with Hadoop.

Combined with a ChainReducer, you can run a chain of mappers, followed by a reducer and another chain of mappers in a single MapReduce job.

Running Dependent Jobs

When there is more than one job in a MapReduce workflow, the question arises: how do you manage the jobs so they are executed in order? There are several approaches, and the main consideration is whether you have a linear chain of jobs, or a more complex directed acyclic graph (DAG) of jobs. For a linear chain, the simplest approach is to run each job one after another, waiting until a job completes successfully before running the next:

If a job fails, the runJob() method will throw an IOException, so later jobs in the pipeline don’t get executed. Depending on your application, you might want to catch the exception and clean up any intermediate data that was produced by any previous jobs.

For anything more complex than a linear chain, there are libraries that can help orchestrate your workflow (although they are suited to linear chains, or even one-off jobs, too). The simplest is in the org.apache.hadoop.mapred.jobcontrol package: the JobControl class. An instance of JobControl represents a graph of jobs to be run. You add the job configurations, then tell the JobControl instance the ependencies between jobs. You run the JobControl in a thread, and it runs the jobs in dependency order. You can poll for progress, and when the jobs have finished, you can query for all the jobs’ statuses and the associated errors for any failures. If a job fails, JobControl won’t run its dependencies.

Oozie

Unlike JobControl, which runs on the client machine submitting the jobs, Oozie (http://yahoo.github.com/oozie/) runs as a server, and a client submits a workflow to the server. In Oozie, a workflow is a DAG of action nodes and control-flow nodes. An action node performs a workflow task, like moving files in HDFS, running a MapReduce job or running a Pig job. A control-flow node governs the workflow execution betweenactions by allowing such constructs as conditional logic (so different execution branches may be followed depending on the result of an earlier action node) or parallel execution. When the workflow completes, Oozie can make an HTTP callback to theclient to inform it of the workflow status. It is also possible to receive callbacks every time the workflow enters or exits an action node.

Oozie allows failed workflows to be re-run from an arbitrary point. This is useful for dealing with transient errors when the early actions in the workflow are timeconsuming to execute.