User-Defined Functions- Filter UDF - Hadoop

Pig’s designers realized that the ability to plug-in custom code is crucial for all but the most trivial data processing jobs. For this reason, they made it easy to define and use user-defined functions.

A Filter UDF

Let’s demonstrate by writing a filter function for filtering out weather records that do not have a temperature quality reading of satisfactory (or better). The idea is to change this line:

This achieves two things: it makes the Pig script more concise, and it encapsulates the logic in one place so that it can be easily reused in other scripts. If we were just writing an ad hoc query, then we probably wouldn’t bother to write a UDF. It’s when you start doing the same kind of processing over and over again that you see opportunities for reusable UDFs.

UDFs are written in Java, and filter functions are all subclasses of FilterFunc, whichitself is a subclass of EvalFunc. We’ll look at EvalFunc in more detail later, but for the moment just note that, in essence, EvalFunc looks like the following class:

EvalFunc’s only abstract method, exec(), takes a tuple and returns a single value, the (parameterized) type T. The fields in the input tuple consist of the expressions passed to the function—in this case, a single integer. For FilterFunc, T is Boolean, so the method should return true only for those tuples that should not be filtered out.

For the quality filter, we write a class, IsGoodQuality, that extends FilterFunc and implements the exec() method. See Example . The Tuple class is essentially a list of objects with associated types. Here we are concerned only with the first field (since the function only has a single argument), which we extract by index using the get() method on Tuple. The field is an integer, so if it’s not null, we cast it and check whether thevalue is one that signifies the temperature was a good reading, returning the appropriate value, true or false.

Example A FilterFunc UDF to remove records with unsatisfactory temperature quality readings package com.hadoopbook.pig;

To use the new function, we first compile it and package it in a JAR file (in the example code that accompanies this book, we can do this by typing ant pig). Then we tell Pig about the JAR file with the REGISTER operator, which is given the local path to the filename (and is not enclosed in quotes):

Pig resolves function calls by treating the function’s name as a Java classname and attempting to load a class of that name. (This, incidentally, is why function names are case-sensitive: because Java classnames are.) When searching for classes, Pig uses a classloader that includes the JAR files that have been registered. When running in distributed mode, Pig will ensure that your JAR files get shipped to the cluster.

For the UDF in this example, Pig looks for a class with the name com.hadoop book.pig.IsGoodQuality, which it finds in the JAR file we registered.

Resolution of built-in functions proceeds in the same way, except for one difference: Pig has a set of built-in package names that it searches, so the function call does not have to be a fully qualified name. For example, the function MAX is actually implemented by a class MAX in the package org.apache.pig.builtin. This is one of the packages that Pig looks in, so we can write MAX rather than org.apache.pig.builtin.MAX in our Pigprograms.

We can’t register our package with Pig, but we can shorten the function name by defining an alias, using the DEFINE operator:

Defining an alias is a good idea if you want to use the function several times in the same script. It’s also necessary if you want to pass arguments to the constructor of the UDF’s implementation class.

Leveraging types

The filter works when the quality field is declared to be of type int, but if the type information is absent, then the UDF fails! This happens because the field is the default type, bytearray, represented by the DataByteArray class. Because DataByteArray is not an Integer, the cast fails.

The obvious way to fix this is to convert the field to an integer in the exec() method. However, there is a better way, which is to tell Pig the types of the fields that the function expects. The getArgToFuncMapping() method on EvalFunc is provided for precisely this reason. We can override it to tell Pig that the first field should be an integer:

This method returns a FuncSpec object corresponding to each of the fields of the tuple that are passed to the exec() method. Here there is a single field, and we construct an anonymous FieldSchema (the name is passed as null, since Pig ignores the name when doing type conversion). The type is specified using the INTEGER constant on Pig’s DataType class.

With the amended function, Pig will attempt to convert the argument passed to the function to an integer. If the field cannot be converted, then a null is passed for the field. The exec() method always returns false if the field is null. For this application, this behavior is appropriate, as we want to filter out records whose quality field is unintelligible.

Here’s the final program using the new function:

An Eval UDF

Writing an eval function is a small step up from writing a filter function. Consider a UDF (see Example ) for trimming the leading and trailing whitespace from chararray values, just like the trim() method on java.lang.String. We will use this UDF later in the chapter.

Example An EvalFunc UDF to trim leading and trailing whitespace from chararray values

An eval function extends the EvalFunc class, parameterized by the type of the return value (which is String for the Trim UDF).* The exec() and getArgToFuncMapping() methods are straightforward, like the ones in the IsGoodQuality UDF.

When you write an eval function, you need to consider what the output’s schema looks like. In the following statement, the schema of B is determined by the function udf:

If udf creates tuples with scalar fields, then Pig can determine B’s schema through reflection. For complex types such as bags, tuples, or maps, Pig needs more help, and

* Although not relevant for this example, eval functions that operate on a bag may additionally implement Pig’s Algebraic or Accumulator interfaces for more efficient processing of the bag in chunks.

you should implement the outputSchema() method to give Pig the information about the output schema.

The Trim UDF returns a string, which Pig translates as a chararray, as can be seen from the following session:

A has chararray fields that have leading and trailing spaces. We create B from A by applying the Trim function to the first field in A (named fruit). B’s fields are correctly inferred to be of type chararray.

A Load UDF

We’ll demonstrate a custom load function that can read plain-text column ranges as fields, very much like the Unix cut command. It is used as follows:

The string passed to CutLoadFunc is the column specification; each comma-separated range defines a field, which is assigned a name and type in the AS clause. Let’s examine the implementation of CutLoadFunc shown in Example .

Example . A LoadFunc UDF to load tuple fields as column ranges

In Pig, like in Hadoop, data loading takes place before the mapper runs, so it is important that the input can be split into portions that are independently handled by each mapper (see “Input Splits and Records” for background).

From Pig 0.7.0 (which is the version used here), the load and store function interfaces have been overhauled to be more closely aligned with Hadoop’s InputFormat and OutputFormat classes. Functions written for previous versions of Pig will need

rewriting (guidelines for doing so are provided at MigrationGuide). A LoadFunc will typically use an existing underlying InputFormat to
create records, with the LoadFunc providing the logic for turning the records into Pig tuples.

CutLoadFunc is constructed with a string that specifies the column ranges to use for each field. The logic for parsing this string and creating a list of internal Range objects that encapsulates these ranges is contained in the Range class, and is not shown here (it is available in the example code that accompanies this book).

Pig calls setLocation() on a LoadFunc to pass the input location to the loader. Since CutLoadFunc uses a TextInputFormat to break the input into lines, we just pass the location to set the input path using a static method on FileInputFormat.

Pig uses the new MapReduce API, so we use the input and output formats and associated classes from the org.apache.hadoop.mapreduce package.

Next, Pig calls the getInputFormat() method to create a RecordReader for each split, just like in MapReduce. Pig passes each RecordReader to the prepareToRead() method of CutLoadFunc, which we store a reference to, so we can use it in the getNext() method for iterating through the records.

The Pig runtime calls getNext() repeatedly, and the load function reads tuples from the reader until the reader reaches the last record in its split. At this point, it returns null to signal that there are no more tuples to be read.

It is the responsibility of the getNext() implementation to turn lines of the input file into Tuple objects. It does this by means of a TupleFactory, a Pig class for creating Tuple instances. The newTuple() method creates a new tuple with the required number of fields, which is just the number of Range classes, and the fields are populated using substrings of the line, which are determined by the Range objects.

We need to think about what to do if the line is shorter than the range asked for. One option is to throw an exception and stop further processing. This is appropriate if your application cannot tolerate incomplete or corrupt records. In many cases, it is better to return a tuple with null fields and let the Pig script handle the incomplete data as it sees fit. This is the approach we take here; by exiting the for loop if the range end ispast the end of the line, we leave the current field and any subsequent fields in the tuple with their default value of null.

Using a schema

Let’s now consider the type of the fields being loaded. If the user has specified a schema, then the fields need converting to the relevant types. However, this is performed lazily by Pig, and so the loader should always construct tuples of type bytearrary, using the DataByteArray type. The loader function still has the opportunity to do the conversion, however, by overriding getLoadCaster() to return a custom implementation of theLoadCaster interface, which provides a collection of conversion methods for this purpose:

CutLoadFunc doesn’t override getLoadCaster() since the default implementation returns Utf8StorageConverter, which provides standard conversions between UTF-8 encoded data and Pig data types.

In some cases, the load function itself can determine the schema. For example, if we were loading self-describing data like XML or JSON, we could create a schema for Pig by looking at the data. Alternatively, the load function may determine the schema in another way, such as an external file, or by being passed information in its constructor.

To support such cases, the load function should implement the LoadMetadata interface (in addition to the LoadFunc interface), so it can supply a schema to the Pig runtime.

Note, however, that if a user supplies a schema in the AS clause of LOAD, then it takes precedence over the schema one specified by the LoadMetadata interface.

A load function may additionally implement the LoadPushDown interface as a means for finding out which columns the query is asking for. This can be a useful optimization for column-oriented storage, so that the loader only loads the columns that are needed by the query. There is no obvious way for CutLoadFunc to load only a subset of columns, since it reads the whole line for each tuple, so we don’t use this optimization.

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd Protection Status

Hadoop Topics