To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines.
Map and Reduce
MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.
The input to our map phase is the raw NCDC data. We choose a text input format that gives us each line in the dataset as a text value. The key is the offset of the beginning of the line from the beginning of the file, but as we have no need for this, we ignore it.
Our map function is simple. We pull out the year and the air temperature, since these are the only fields we are interested in. In this case, the map function is just a data preparation phase, setting up the data in such a way that the reducer function can do its work on it: finding the maximum temperature for each year. The map function is also a good place to drop bad records: here we filter out temperatures that are missing, suspect, or erroneous.
To visualize the way the map works, consider the following sample lines of input data (some unused columns have been dropped to fit the page, indicated by ellipses):
These lines are presented to the map function as the key-value pairs:
The keys are the line offsets within the file, which we ignore in our map function. The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output (the temperature values have been interpreted as integers):
The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key. So, continuing the example, our reduce function sees the following input:
Each year appears with a list of all its air temperature readings. All the reduce function has to do now is iterate through the list and pick up the maximum reading:
This is the final output: the maximum global temperature recorded in each year. The whole data flow is illustrated in Figure At the bottom of the diagram is a Unix pipeline, which mimics the whole MapReduce flow, and which we will see again later in the chapter when we look at Hadoop Streaming.
Having run through how the MapReduce program works, the next step is to express it in code. We need three things: a map function, a reduce function, and some code to run the job. The map function is represented by an implementation of the Mapper interface, which declares a map() method.
Example shows the implementation of our map function.
Example Mapper for maximum temperature example
The Mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer).
The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in.
The map() method also provides an instance of OutputCollector to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable. We write an output record only if the< temperature is present and the quality code indicates the temperature reading is OK. The reduce function is similarly defined using a Reducer, as illustrated in Example
Example Reducer for maximum temperature example
Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximumtemperature, which we find by iterating through the temperatures and comparing eachwith a record of the highest found so far.
The third piece of code runs the MapReduce job (see Example ).
Example Application to find the maximum temperature in the weather dataset
A JobConf object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.
Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths.
The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another).
Next, we specify the map and reduce types to use via the setMapperClass() and setReducerClass() methods.
The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. setMapOutputKeyClass() and setMapOutputValueClass().
The input types are controlled via the input format, which we have not explicitly set since we are using the default TextInputFormat.
After setting the classes that define the map and reduce functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it to finish, writing information about its progress to the console.
A test run
After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any immediate problems with the code. First install Hadoop in standalone mode there are instructions for how to do this in Appendix A. This is the mode in which Hadoop runs using the local filesystem with a local job runner. Let’s test it on the fiveline sample discussed earlier (the output has been slightly reformatted to fit the page):
When the hadoop command is invoked with a classname as the first argument, itlaunches a JVM to run the class. It is more convenient to use hadoop than straight java since the former adds the Hadoop libraries (and their dependencies) to the classpath and picks up the Hadoop configuration, too. To add the application classes to the classpath, we’ve defined an environment variable called HADOOP_CLASSPATH, which the hadoop script picks up.
When running in local (standalone) mode, the programs in this book all assume that you have set the HADOOP_CLASSPATH in this way. The commands should be run from the directory that the example code is installed in.
The output from running the job provides some useful information. (The warning about the job JAR file not being found is expected, since we are running in local mode without a JAR. We won’t see this warning when we run on a cluster.) For example, we can see that the job was given an ID of job_local_0001, and it ran one map task and one reduce task (with the IDs attempt_local_0001_m_000000_0 and attempt_local_0001_r_000000_0).
Knowing the job and task IDs can be very useful whendebugging MapReduce jobs.
The last section of the output, titled “Counters,” shows the statistics that Hadoop generates for each job it runs. These are very useful for checking whether the amount of data processed is what you expected. For example, we can follow the number ofrecords that went through the system: five map inputs produced five map outputs, thenfive reduce inputs in two groups produced two reduce outputs.
The output was written to the output directory, which contains one output file perreducer. The job had a single reducer, so we find a single file, named part-00000:
This result is the same as when we went through it by hand earlier. We interpret thisas saying that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 itwas 2.2°C.
The new Java MapReduce API
Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewrittento take advantage of it.
There are several notable differences between the two APIs:
use the new API (for releases 0.21.0 and later), will be made available on the book’s website.
Exampleshows the MaxTemperature application rewritten to use the new API. The differences are highlighted in bold.
When converting your Mapper and Reducer classes to the new API, don’t forget to change the signature of the map() and reduce() methods to the new form. Just changing your class to extend the new Mapper or Reducer classes will not produce a compilation error or warning, since these classes provide an identity form of the map() or reduce() method (respectively). Your mapper or reducer code, however, will not be invoked,which can lead to some hard-to-diagnose errors.
Example Application to find the maximum temperature in the weather dataset using the new context objects MapReduce API
Hadoop Related Interview Questions
|Informatica Interview Questions||Teradata Interview Questions|
|Hadoop Interview Questions||Java Interview Questions|
|Hadoop MapReduce Interview Questions||Apache Pig Interview Questions|
|Machine learning Interview Questions||NoSQL Interview Questions|
|HBase Interview Questions||MongoDB Interview Questions|
|Data Science R Interview Questions|
The Hadoop Distributed Filesystem
Developing A Mapreduce Application
How Mapreduce Works
Mapreduce Types And Formats
Setting Up A Hadoop Cluster
All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd
Wisdomjobs.com is one of the best job search sites in India.