Example of Hadoop tools - Hadoop

Although HDFS and MapReduce are powerful tools for processing batch operations over large datasets, they do not provide ways to read or write individual records efficiently.In this example, we’ll explore using HBase as the tool to fill this gap.

The existing weather dataset described in previous chapters contains observations for tens of thousands of stations over 100 years and this data is growing without bound. In this example, we will build a simple web interface that allows a user to navigate thedifferent stations and page through their historical temperature observations in time order. For the sake of this example, let us allow that the dataset is massive, that theobservations run to the billions, and that the rate at which temperature updates arrive is significant say hundreds to thousands of updates a second from around the world across the whole range of weather stations.

Also, let us allow that it is a requirement that the web application must display the most up-to-date observation within a second or so of receipt.

The first size requirement should preclude our use of a simple RDBMS instance and make HBase a candidate store. The second latency requirement rules out plain HDFS.

A MapReduce job could build initial indices that allowed random-access over all of the observation data, but keeping up this index as the updates arrived is not what HDFS and MapReduce are good at.


In our example, there will be two tables:


This table holds station data. Let the row key be the stationid. Let this table have a column family info that acts as a key/val dictionary for station information. Let the dictionary keys be the column names info:name, info:location, and info:description. This table is static and the info family, in this case, closely mirrors a typical RDBMS table design.


This table holds temperature observations. Let the row key be a composite key of stationid + reverse order timestamp. Give this table a column family data that will contain one column airtemp with the observed temperature as the column value.

Our choice of schema is derived from how we want to most efficiently read from HBase. Rows and columns are stored in increasing lexicographical order. Though there are facilities for secondary indexing and regular expression matching, they come at a performance penalty. It is vital that you understand how you want to most efficiently query your data in order to most effectively store and access it.

For the stations table, the choice of stationid as key is obvious because we will always access information for a particular station by its id. The observations table, however, uses a composite key that adds the observation timestamp at the end. This will group all observations for a particular station together, and by using a reverse order timestamp (Long.MAX_VALUE - epoch) and storing it as binary, observations for each station will be ordered with most recent observation first.

In the shell, you would define your tables as follows:

In both cases, we are interested only in the latest version of a table cell, so set VERSIONS to 1. The default is 3.

Loading Data

There are a relatively small number of stations, so their static data is easily inserted using any of the available interfaces.

However, let’s assume that there are billions of individual observations to be loaded. This kind of import is normally an extremely complex and long-running database operation, but MapReduce and HBase’s distribution model allow us to make full use of the cluster. Copy the raw input data onto HDFS and then run a MapReduce job that can read the input and write to HBase.

Example shows an example MapReduce job that imports observations to HBase from the same input file used in the previous chapters’ examples.

Example . A MapReduce application to import temperature data from HDFS into an HBase table

HBaseTemperatureImporter has an inner class named HBaseTemperatureMapper that is like the MaxTemperatureMapper class from Developing AMapReduce Application Chapter . The outer class implements Tool and does the setup to launch the HBaseTemperatureMapper inner class. HBaseTemperatureMap per takes the same input as MaxTemperatureMapper and does the same parse using the NcdcRecordParser introduced in developing A MapReduce Application Chapter to check for valid temperatures, but rather than add valid temperatures to the output collector as MaxTemperatureMapper does, instead it adds valid temperatures to the observations HBase table into the data:airtemp column. (We are using static defines for data and airtemp imported from HBase TemperatureCli class described later below.) In the configure() method, we create an HTable instance once against the observations table and use it afterward in map invocations talking to HBase. Finally, we call close on our HTable instance to flush out any write buffers not yet cleared.

The row key used is created in the makeObservationRowKey() method on RowKey Converter from the station ID and observation time:

The conversion takes advantage of the fact that the station ID is a fixed-length string. The Bytes class used in makeObservationRowKey() is from the HBase utility package. It includes methods for converting between byte arrays and common Java and Hadoop types. In makeObservationRowKey(), the Bytes.putLong() method is used to fill the key byte array. The Bytes.SIZEOF_LONG constant is used for sizing and positioning in the row key array.

We can run the program with the following:

Optimization notes

  • Watch for the phenomenon where an import walks in lock-step through the table with all clients in concert pounding one of the table’s regions (and thus, a single node), then moving on to the next, and so on, rather than evenly distributing the load over all regions. This is usually brought on by some interaction between sorted input and how the splitter works. Randomizing the ordering of your row keys prior to insertion may help. In our example, given the distribution of stationid values and how TextInputFormat makes splits, the upload should be sufficiently distributed.

  • Only obtain one HTable instance per task. There is a cost to instantiating an HTable, so if you do this for each insert, you may have a negative impact on performance, hence our setup of HTable in the configure() step. If a table is new, it will have only one region and initially all updates will be to this single region until it splits. This will happen even if row keys are randomly distributed. This startup phenomenon means uploads run
    slow at first until there are sufficient regions distributed so all cluster members are able to participate in the upload. Do not confuse this phenomenon with that noted here.

  • By default, each HTable.put(put) actually performs the insert without any buffering. You can disable the HTable auto-flush feature using HTable.setAuto Flush(false) and then set the size of configurable write buffer. When the inserts committed fill the write buffer, it is then flushed. Remember though, you must call a manual HTable.flushCommits(), or HTable.close(), which will call through to HTable.flushCommits() at the end of each task to ensure that nothing is left unflushed in the buffer. You could do this in an override of the mapper’s close() method.

  • HBase includes TableInputFormat and TableOutputFormat to help with MapReduce jobs that source and sink HBase (see Example ). One way to write the previous example would have been to use MaxTemperatureMapper from Developing aMapReduce Application Chapter as is but add a reducer task that takes the output of the MaxTemperatureMapper and feeds it to HBase via TableOutputFormat

Web Queries

To implement the web application, we will use the HBase Java API directly. Here it becomes clear how important your choice of schema and storage format is. The simplest query will be to get the static station information. This type of query is simple in a traditional database, but HBase gives you additional control and flexibility. Using the info family as a key/value dictionary (column names as keys, column values
as values), the code would look like this:

In this example, getStationInfo() takes an HTable instance and a station ID. To get the station info, we use HTable.get() passing a Get instance configured to get all in the defined column family INFO_COLUMNFAMILY.

The get() results are returned in Result. It contains the row and you can fetch cell values by stipulating the column cell wanted. The getStationInfo() method converts the Result Map into a more friendly Map of String keys and values.

We can already see how there is a need for utility functions when using HBase. There are an increasing number of abstractions being built atop HBase to deal with this lowlevel interaction, but it’s important to understand how this works and how storage choices make a difference.

One of the strengths of HBase over a relational database is that you don’t have to prespecify the columns. So, in the future, if each station now has at least these three attributes but there are hundreds of optional ones, we can just insert them without modifying the schema. Your applications reading and writing code would of course need to be changed. The example code might change in this case to looping through Result rather than explicitly grabbing each value explicitly.

We will make use of HBase scanners for retrieval of observations in our web application.

Here we are after a Map<ObservationTime, ObservedTemp> result. We will use a NavigableMap<Long, Integer> because it is sorted and has a descendingMap() method, so we can access observations in both ascending or descending order. The code is in Example .

Example . Methods for retrieving a range of rows of weather station observations from an HBase table

The getStationObservations() method takes a station ID and a range defined by max Stamp and a maximum number of rows (maxCount). Note that the NavigableMap that is returned is actually now in descending time order. If you want to read through it in ascending order, you would make use of NavigableMap.descendingMap().


HBase scanners are like cursors in a traditional database or Java iterators, except unlike the latter they have to be closed after use. Scanners return rows in order. Users obtain a scanner on an HBase table by calling HTable.getScanner(scan) where the scan parameter is a configured instance of a Scan object. In the Scan instance, you can pass the row at which to start and stop the scan, which columns in a row to return in the row result, and optionally, a filter to run on the server side. The ResultScanner Interface, the Interface returned when you call HTable.getScanner() absent Javadoc, is as follows:

You can ask for the next row’s results or a number of rows. Each invocation of next() involves a trip back to the regionserver, so grabbing a bunch of rows at once can make for significant performance savings.To learn more about the server-side filtering mechanism in HBase, see http://hadoop.apache.org/hbase/ docs/current/api/org/apache/hadoop/hbase/filter/package-summary.html.

The hbase.client.scanner.caching configuration option is set to 1 by default. You can also set how much to cache/prefetch on the Scan instance itself. Scanners will, under the covers, fetch this many results at atime, bringing them client side, and returning to the server to fetch the next batch only after the current batch has been exhausted. Higher caching values will enable faster scanning but will eat up more memoryin the client. Also, avoid setting the caching so high that the time spent processing the batch client-side exceeds the scanner lease period. If a client fails to check back with the server before the scanner leaseexpires, the server will go ahead and garbage collect resources consumed by the scanner server-side. The default scanner lease is 60 seconds, the default value for hbase.regionserver.lease.period. Clients will see a UnknownScannerException if the scanner lease has expired.

The advantage of storing things as Long.MAX_VALUE - stamp may not be clear in the previous example. It has more use when you want to get the newest observations for a given offset and limit, which is often the case in web applications. If the observations were stored with the actual stamps, we would be able to get only the oldest observations for a given offset and limit efficiently. Getting the newest would mean getting all of them and then grabbing them off the end. One of the prime reasons for moving from RDBMS to HBase is to allow for these types of “early-out” scenarios.

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics