Analyzing the Data with Hadoop - Hadoop

To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines.

Map and Reduce

MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.

The input to our map phase is the raw NCDC data. We choose a text input format that gives us each line in the dataset as a text value. The key is the offset of the beginning of the line from the beginning of the file, but as we have no need for this, we ignore it.

Our map function is simple. We pull out the year and the air temperature, since these are the only fields we are interested in. In this case, the map function is just a data preparation phase, setting up the data in such a way that the reducer function can do its work on it: finding the maximum temperature for each year. The map function is also a good place to drop bad records: here we filter out temperatures that are missing, suspect, or erroneous.

To visualize the way the map works, consider the following sample lines of input data (some unused columns have been dropped to fit the page, indicated by ellipses):

These lines are presented to the map function as the key-value pairs:

The keys are the line offsets within the file, which we ignore in our map function. The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output (the temperature values have been interpreted as integers):

The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key. So, continuing the example, our reduce function sees the following input:

Each year appears with a list of all its air temperature readings. All the reduce function has to do now is iterate through the list and pick up the maximum reading:

This is the final output: the maximum global temperature recorded in each year. The whole data flow is illustrated in Figure At the bottom of the diagram is a Unix pipeline, which mimics the whole MapReduce flow, and which we will see again later in the chapter when we look at Hadoop Streaming.


Java MapReduce

Having run through how the MapReduce program works, the next step is to express it in code. We need three things: a map function, a reduce function, and some code to run the job. The map function is represented by an implementation of the Mapper interface, which declares a map() method.

Example shows the implementation of our map function.

Example Mapper for maximum temperature example

The Mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in.

The map() method also provides an instance of OutputCollector to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable. We write an output record only if the< temperature is present and the quality code indicates the temperature reading is OK. The reduce function is similarly defined using a Reducer, as illustrated in Example

Example Reducer for maximum temperature example

Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximumtemperature, which we find by iterating through the temperatures and comparing eachwith a record of the highest found so far.

The third piece of code runs the MapReduce job (see Example ).

Example Application to find the maximum temperature in the weather dataset

A JobConf object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.

Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths.

The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another).

Next, we specify the map and reduce types to use via the setMapperClass() and setReducerClass() methods.

The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. setMapOutputKeyClass() and setMapOutputValueClass().

The input types are controlled via the input format, which we have not explicitly set since we are using the default TextInputFormat.

After setting the classes that define the map and reduce functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it to finish, writing information about its progress to the console.

A test run

After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any immediate problems with the code. First install Hadoop in standalone mode there are instructions for how to do this in Appendix A. This is the mode in which Hadoop runs using the local filesystem with a local job runner. Let’s test it on the fiveline sample discussed earlier (the output has been slightly reformatted to fit the page):

When the hadoop command is invoked with a classname as the first argument, itlaunches a JVM to run the class. It is more convenient to use hadoop than straight java since the former adds the Hadoop libraries (and their dependencies) to the classpath and picks up the Hadoop configuration, too. To add the application classes to the classpath, we’ve defined an environment variable called HADOOP_CLASSPATH, which the hadoop script picks up.

When running in local (standalone) mode, the programs in this book all assume that you have set the HADOOP_CLASSPATH in this way. The commands should be run from the directory that the example code is installed in.

The output from running the job provides some useful information. (The warning about the job JAR file not being found is expected, since we are running in local mode without a JAR. We won’t see this warning when we run on a cluster.) For example, we can see that the job was given an ID of job_local_0001, and it ran one map task and one reduce task (with the IDs attempt_local_0001_m_000000_0 and attempt_local_0001_r_000000_0).

Knowing the job and task IDs can be very useful whendebugging MapReduce jobs.

The last section of the output, titled “Counters,” shows the statistics that Hadoop generates for each job it runs. These are very useful for checking whether the amount of data processed is what you expected. For example, we can follow the number ofrecords that went through the system: five map inputs produced five map outputs, thenfive reduce inputs in two groups produced two reduce outputs.

The output was written to the output directory, which contains one output file perreducer. The job had a single reducer, so we find a single file, named part-00000:

This result is the same as when we went through it by hand earlier. We interpret thisas saying that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 itwas 2.2°C.

The new Java MapReduce API

Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewrittento take advantage of it.

There are several notable differences between the two APIs:

  • The new API favors abstract classes over interfaces, since these are easier to evolve.For example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. In the new API, the Mapper and Reducer interfaces are now abstract classes.

  • The new API is in the org.apache.hadoop.mapreduce package (and subpackages).The old API can still be found in org.apache.hadoop.mapred.
  • The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The MapContext, for example, essentially unifies the role of the JobConf, the OutputCollector, and the Reporter.
  • The new API supports both a “push” and a “pull” style of iteration. In both APIs,key-value record pairs are pushed to the mapper, but in addition, the new API allows a mapper to pull records from within the map() method. The same goes forthe reducer. An example of how the “pull” style can be useful is processing records in batches, rather than one by one.
  • *The new API is not complete (or stable) in the 0.20 release series (the latest available at the time of writing). This book uses the old API for this reason. However, a copy of all of the examples in this book, rewritten to

    use the new API (for releases 0.21.0 and later), will be made available on the book’s website.

  • Configuration has been unified. The old API has a special JobConf object for job configuration, which is an extension of Hadoop’s vanilla Configuration object used for configuring daemons; see “The Configuration API” ). In the new API, this distinction is dropped, so job configuration is done through a Configuration.
  • Job control is performed through the Job class, rather than JobClient, which no longer exists in the new API.
  • Output files are named slightly differently: part-m-nnnnn for map outputs, and partr- nnnnn for reduce outputs (where nnnnn is an integer designating the part number, starting from zero).

Exampleshows the MaxTemperature application rewritten to use the new API. The differences are highlighted in bold.

When converting your Mapper and Reducer classes to the new API, don’t forget to change the signature of the map() and reduce() methods to the new form. Just changing your class to extend the new Mapper or Reducer classes will not produce a compilation error or warning, since these classes provide an identity form of the map() or reduce() method (respectively). Your mapper or reducer code, however, will not be invoked,which can lead to some hard-to-diagnose errors.

Example Application to find the maximum temperature in the weather dataset using the new context objects MapReduce API

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd Protection Status

Hadoop Topics