User-Defined Functions Have query - Hadoop

Sometimes the query you want to write can’t be expressed easily (or at all) using the built-in functions that Hive provides. By writing a user-defined function (UDF), Hive makes it easy to plug in your own processing code and invoke it from a Hive query.

UDFs have to be written in Java, the language that Hive itself is written in. For other languages, consider using a SELECT TRANSFORM query, which allows you to stream data through a user-defined script (“MapReduce Scripts” ).

There are three types of UDF in Hive: (regular) UDFs, UDAFs (user-defined aggregate functions), and UDTFs (user-defined table-generating functions). They differ in the numbers of rows that they accept as input and produce as output:

  • A UDF operates on a single row and produces a single row as its output. Most functions, such as mathematical functions and string functions, are of this type.
  • A UDAF works on multiple input rows and creates a single output row. Aggregate functions include such functions as COUNT and MAX.
  • A UDTF operates on a single row and produces multiple rows a table as output. Table-generating functions are less well known than the other two types, so let’s look at an example. Consider a table with a single column, x, which contains arrays of strings.

It’s instructive to take a slight detour to see how the table is defined and populated:

Notice that the ROW FORMAT clause specifies that the entries in the array are delimited by Control-B characters. The example file that we are going to load has the following contents, where ^B is a representation of the Control-B character to make it suitable for printing:

After running a LOAD DATA command, the following query confirms that the data was loaded correctly:

Next, we can use the explode UDTF to transform this table. This function emits a row for each entry in the array, so in this case the type of the output column y is STRING.

The result is that the table is flattened into five rows:

SELECT statements using UDTFs have some restrictions (such as not being able to retrieve additional column expressions), which make them less useful in practice. For this reason, Hive supports LATERAL VIEW queries, which are more powerful. LATERAL VIEW queries not covered here, but you may find out more about them at http://wiki .apache.org/hadoop/Hive/LanguageManual/LateralView.

Writing a UDF

To illustrate the process of writing and using a UDF, we’ll write a simple UDF to trim characters from the ends of strings. Hive already has a built-in function called trim, so we’ll call ours strip. The code for the Strip Java class is shown in Example .

Example . A UDF for stripping characters from the ends of strings

A UDF must satisfy the following two properties:

  1. A UDF must be a subclass of org.apache.hadoop.hive.ql.exec.UDF.
  2. A UDF must implement at least one evaluate() method.

The evaluate() method is not defined by an interface since it may take an arbitrary number of arguments, of arbitrary types, and it may return a value of arbitrary type.

Hive introspects the UDF to find the evaluate() method that matches the Hive function that was invoked.The Strip class has two evaluate() methods. The first strips leading and trailing whitespace from the input, while the second can strip any of a set of supplied characters from the ends of the string. The actual string processing is delegated to the StringUtils class from the Apache Commons project, which makes the only noteworthy part of the code the use of Text from the Hadoop Writable library. Hive actually supports Java primitives in UDFs (and a few other types like java.util.List and java.util.Map), so a signature like:

would work equally well. However, by using Text, we can take advantage of object reuse, which can bring efficiency savings, and so is to be preferred in general.

To use the UDF in Hive, we need to package the compiled Java class in a JAR file (you can do this by typing ant hive with the book’s example code) and register the file with Hive:

The TEMPORARY keyword here highlights the fact that UDFs are only defined for the duration of the Hive session (they are not persisted in the metastore). In practice, this means you need to add the JAR file, and define the function at the beginning of each script or session.

As an alternative to calling ADD JAR, you can specify at launch time a path where Hive looks for auxiliary JAR files to put on its classpath (including the MapReduce classpath). This technique is useful for automatically adding your own library of UDFs every time you run Hive.

There are two ways of specifying the path, either passing the --auxpath option to the hive command:

Writing a UDAF

An aggregate function is more difficult to write than a regular UDF, since values are aggregated in chunks (potentially across many Map or Reduce tasks), so the implementation has to be capable of combining partial aggregations into a final result. The code to achieve this is best explained by example, so let’s look at the implementation of a simple UDAF for calculating the maximum of a collection of integers (Example ).

Example . A UDAF for calculating the maximum of a collection of integers

The class structure is slightly different to the one for UDFs. A UDAF must be a subclass of org.apache.hadoop.hive.ql.exec.UDAF (note the “A” in UDAF) and contain one or more nested static classes implementing org.apache.hadoop.hive.ql.exec.UDAFEvalua tor. In this example, there is a single nested class, MaximumIntUDAFEvaluator, but we could add more evaluators such as MaximumLongUDAFEvaluator, MaximumFloatUDAF Evaluator, and so on, to provide overloaded forms of the UDAF for finding the maximum of a collection of longs, floats, and so on.

An evaluator must implement five methods, described in turn below (the flow is illustrated in Figure):

init()

The init() method initializes the evaluator and resets its internal state. In MaximumIntUDAFEvaluator, we set the IntWritable object holding the final result to null. We use null to indicate that no values have been aggregated yet, which has the desirable effect of making the maximum value of an empty set NULL.

iterate()

The iterate() method is called every time there is a new value to be aggregated. The evaluator should update its internal state with the result of performing the aggregation. The arguments that iterate() takes correspond to those in the Hive function from which it was called. In this example, there is only one argument.

The value is first checked to see if it is null, and if it is, it is ignored. Otherwise, the result instance variable is set to value’s integer value (if this is the first value that has been seen), or set to the larger of the current result and value (if one or more values have already been seen). We return true to indicate that the input value was valid.

terminatePartial()

The terminatePartial() method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation. In this case, an IntWritable suffices, since it encapsulates either the maximum value seen or null if no values have been processed.

merge()

The merge() method is called when Hive decides to combine one partial aggregation with another. The method takes a single object whose type must correspond to the return type of the terminatePartial() method. In this example, the merge() method can simply delegate to the iterate() method, because the partial aggregation is represented in the same way as a value being aggregated. This is not generally the case (and we’ll see a more general example later), and the method should implement the logic to combine the evaluator’s state with the state of the partial aggregation.

Figure. Data flow with partial results for a UDAF

Data flow with partial results for a UDAF

terminate()

The terminate() method is called when the final result of the aggregation is needed. The evaluator should return its state as a value. In this case, we return the result instance variable.

Let’s exercise our new function:

A more complex UDAF

The previous example is unusual in that a partial aggregation can be represented using the same type (IntWritable) as the final result. This is not generally the case for more complex aggregate functions, as can be seen by considering a UDAF for calculating the mean (average) of a collection of double values. It’s not mathematically possible to combine partial means into a final mean value (see “Combiner Functions”). Instead, we can represent the partial aggregation as a pair of numbers:

the cumulative sum of the double values processed so far, and the number of values.

This idea is implemented in the UDAF shown in Example Notice that the partial aggregation is implemented as a “struct” nested static class, called PartialResult, which Hive is intelligent enough to serialize and deserialize, since we are using field types that Hive can handle (Java primitives in this case).

In this example, the merge() method is different to iterate(), since it combines the partial sums and partial counts, by pairwise addition. Also, the return type of termina tePartial() is PartialResult which of course is never seen by the user calling the function while the return type of terminate() is DoubleWritable, the final result seen by the user.

Example . A UDAF for calculating the mean of a collection of doubles


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics