Counters - Hadoop

There are often things you would like to know about the data you are analyzing but that are peripheral to the analysis you are performing. For example, if you were counting invalid records and discovered that the proportion of invalid records in the whole dataset was very high, you might be prompted to check why so many records were being marked as invalid perhaps there is a bug in the part of the program that detects invalid records? Or if the data were of poor quality and genuinely did have very many invalid records, after discovering this, you might decide to increase the size of the dataset so that the number of good records was large enough for meaningful analysis.

Counters are a useful channel for gathering statistics about the job: for quality control or for application level-statistics. They are also useful for problem diagnosis. If you are tempted to put a log message into your map or reduce task, then it is often better to see whether you can use a counter instead to record that a particular condition occurred.

In addition to counter values being much easier to retrieve than log output for large distributed jobs, you get a record of the number of times that condition occurred, which is more work to obtain from a set of logfiles.

Built-in Counters

Hadoop maintains some built-in counters for every job (Table ), which report various metrics for your job. For example, there are counters for the number of bytes and records processed, which allows you to confirm that the expected amount of input was consumed and the expected amount of output was produced.

Built-in Counters

Counters are maintained by the task with which they are associated, and periodically sent to the tasktracker and then to the jobtracker, so they can be globally aggregated.(This is described in “Progress and Status Updates” ) The built-in Job Counters are actually maintained by the jobtracker, so they don’t need to be sent across the network, unlike all other counters, including user-defined ones.

A task’s counters are sent in full every time, rather than sending the counts since thelast transmission, since this guards against errors due to lost messages. Furthermore, during a job run, counters may go down if a task fails. Counter values are definitive only once a job has successfully completed.

User-Defined Java Counters

MapReduce allows user code to define a set of counters, which are then incremented as desired in the mapper or reducer. Counters are defined by a Java enum, which serves to group related counters. A job may define an arbitrary number of enums, each with an arbitrary number of fields. The name of the enum is the group name, and the enum’s fields are the counter names. Counters are global: the MapReduce framework aggregates them across all maps and reduces to produce a grand total at the end of the job.

We created some counters in Developing a mapreduce application Chapter for counting malformed records in the weather dataset. The program in Example extends that example to count the number of missing records and the distribution of temperature quality codes.

Example . Application to run the maximum temperature job, including counting missing and malformed fields and quality codes

The best way to see what this program does is run it over the complete dataset:

% hadoop jar job.jar MaxTemperatureWithCounters input/ncdc/all output-counters

When the job has successfully completed, it prints out the counters at the end (this is done by JobClient’s runJob() method). Here are the ones we are interested in:

Dynamic counters

The code makes use of a dynamic counter one that isn’t defined by a Java enum. Since a Java enum’s fields are defined at compile time, you can’t create new counters on the fly using enums. Here we want to count the distribution of temperature quality codes, and though the format specification defines the values that it can take, it is more convenient to use a dynamic counter to emit the values that it actually takes. Themethod we use on the Reporter object takes a group and counter name using String names:

public void incrCounter(String group, String counter, long amount)

The two ways of creating and accessing counters using enums and using Strings are actually equivalent since Hadoop turns enums into Strings to send counters over RPC. Enums are slightly easier to work with, provide type safety, and are suitable for most jobs. For the odd occasion when you need to create counters dynamically, you can use the String interface.

Readable counter names

By default, a counter’s name is the enum’s fully qualified Java classname. These names are not very readable when they appear on the web UI, or in the console, so Hadoop provides a way to change the display names using resource bundles. We’ve done this here, so we see “Air Temperature Records” instead of “Temperature$MISSING.” For dynamic counters, the group and counter names are used for the display names, so this is not normally an issue.

The recipe to provide readable names is as follows. Create a properties file named after the enum, using an underscore as a separator for nested classes. The properties file should be in the same directory as the top-level class containing the enum. The file is named MaxTemperatureWithCounters_Temperature.properties for the counters in Example

The properties file should contain a single property named CounterGroupName, whose value is the display name for the whole group. Then each field in the enum should have a corresponding property defined for it, whose name is the name of the field suffixed with .name, and whose value is the display name for the counter. Here are the contents of MaxTemperatureWithCounters_Temperature.properties:

Hadoop uses the standard Java localization mechanisms to load the correct properties for the locale you are running in, so, for example, you can create a Chinese version of the properties in a file named MaxTemperatureWithCounters_Temperature_ zh_CN.properties, and they will be used when running in the zh_CN locale. Refer to the documentation for java.util.PropertyResourceBundle for more information.

Retrieving counters

In addition to being available via the web UI and the command line (using hadoop job -counter), you can retrieve counter values using the Java API. You can do this while the job is running, although it is more usual to get counters at the end of a job run, when they are stable. Example shows a program that calculates the proportion of records that have missing temperature fields.

Example . Application to calculate the proportion of records with missing temperature fields

First we retrieve a RunningJob object from a JobClient, by calling the getJob() method with the job ID. We check whether there is actually a job with the given ID. There may not be, either because the ID was incorrectly specified or because the jobtracker no longer has a reference to the job (only the last 100 jobs are kept in memory, controlled by mapred.jobtracker.completeuserjobs.maximum, and all are cleared out if the jobtracker is restarted).

After confirming that the job has completed, we call the RunningJob’s getCounters() method, which returns a Counters object, encapsulating all the counters for a job. The Counters class provides various methods for finding the names and values of counters.

We use the getCounter() method, which takes an enum to find the number of records that had a missing temperature field.

There are also findCounter() methods, all of which return a Counter object. We use this form to retrieve the built-in counter for map input records. To do this, we refer to the counter by its group name the fully qualified Java classname for the enum and counter name (both strings).

Finally, we print the proportion of records that had a missing temperature field. Here’s what we get for the whole weather dataset:

* The built-in counter’s enums are not currently a part of the public API, so this is the only way to retrieve them. From release 0.21.0, counters are available via the JobCounter and TaskCounter enums in theorg.apache.hadoop.mapreduce package.

User-Defined Streaming Counters

A Streaming MapReduce program can increment counters by sending a specially formatted line to the standard error stream, which is co-opted as a control channel in this case. The line must have the following format:

reporter:counter:group,counter,amount This snippet in Python shows how to increment the “Missing” counter in the “Temperature” group by one:

sys.stderr.write("reporter:counter:Temperature,Missing,1n")

In a similar way, a status message may be sent with a line formatted like this:

reporter:status:message


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics