Tables - Hadoop

A Hive table is logically made up of the data being stored and the associated metadata describing the layout of the data in the table. The data typically resides in HDFS, although it may reside in any Hadoop filesystem, including the local filesystem or S3. Hive stores the metadata in a relational database and not in HDFS, say (see “The Metastore” ).

In this section, we shall look in more detail at how to create tables, the different physical storage formats that Hive offers, and how to import data into them.

Multiple Database/Schema Support

Many relational databases have a facility for multiple namespaces, which allow users and applications to be segregated into different databases or schemas. At the time of writing, all tables in Hive live in a single default namespace; however, Hive 0.6.0 plans to support multiple databases, providing commands such as CREATE DATABASE dbname, USE dbname, and DROP DATABASE dbname.

Managed Tables and External Tables

When you create a table in Hive, by default Hive will manage the data, which means that Hive moves the data into its warehouse directory. Alternatively, you may create an external table, which tells Hive to refer to the data that is at an existing location outside the warehouse directory.

The difference between the two types of table is seen in the LOAD and DROP semantics.Let’s consider a managed table first.

When you load data into a managed table, it is moved into Hive’s warehouse directory. For example:

will move the file hdfs://user/tom/data.txt into Hive’s warehouse directory for the managed_table table, which is hdfs://user/hive/warehouse/managed_table. The move will only succeed if the source and target filesystems are the same. Also, there is a special case if the LOCAL keyword is used, where Hive will copy the data from the local filesystem into Hive’s warehouse directory (even if it, too, is on the same local filesystem). In all other cases though, LOAD is a move operation and is best thought of as such.

The load operation is very fast, since it is just a filesystem move. However,bear in mind that Hive does not check that the files in the table directory conform to the schema declared for the table, even for managed tables. If there is a mismatch, then this will become apparent at query time, often by the query returning NULL for a missing field. You can check that the data is being parsed correctly by issuing a simple SELECT statement to retrieve a few rows directly from the table.

If the table is later dropped, using:

then the table, including its metadata and its data, is deleted. It bears repeating that longer exists anywhere. This is what it means for Hive to manage the data.

since the initial LOAD performed a move operation, and the DROP performed a delete operation, the data no An external table behaves differently.

You control the creation and deletion of the data. The location of the external data is specified at table creation time:

With the EXTERNAL keyword, Hive knows that it is not managing the data, so it doesn’t move it to its warehouse directory. Indeed, it doesn’t even check if the external location exists at the time it is defined. This is a useful feature, since it means you can create the data lazily after creating the table.

When you drop an external table, Hive will leave the data untouched and only delete the metadata.So how do you choose which type of table to use? In most cases, there is not much difference between the two (except of course for the difference in DROP semantics), so it is a just a matter of preference. As a rule of thumb, if you are doing all your processing with Hive, then use managed tables, but if you wish to use Hive and other tools on the same dataset, then use external tables. A common pattern is to use an external table to access an initial dataset stored in HDFS (created by another process), then use a Hive transform to move the data into a managed Hive table. This works the other way around, too an external table (not necessarily on HDFS) can be used to export data from Hive for other applications to use.

You can also use INSERT OVERWRITE DIRECTORY to export data to a Hadoop filesystem, but unlike external tables you cannot control the output format, which is Control-A separated text files. Complex datatypes are serialized using a JSON representation.

Another reason for using external tables is when you wish to associate multiple schemas with the same dataset.

Partitions and Buckets

Hive organizes tables into partitions, a way of dividing a table into coarse-grained parts based on the value of a partition column, such as date. Using partitions can make it faster to do queries on slices of the data.Tables or partitions may further be subdivided into buckets, to give extra structure to the data that may be used for more efficient queries. For example, bucketing by user ID means we can quickly evaluate a user-based query by running it on a randomized sample of the total set of users.

Partitions

To take an example where partitions are commonly used, imagine log files where each record includes a timestamp. If we partitioned by date, then records for the same date would be stored in the same partition. The advantage to this scheme is that queries that are restricted to a particular date or set of dates can be answered much more efficiently since they only need to scan the files in the partitions that the query pertains to. Notice that partitioning doesn’t preclude more wide-ranging queries: it is still feasible to query the entire dataset across many partitions.

A table may be partitioned in multiple dimensions. For example, in addition to partitioning logs by date, we might also subpartition each date partition by country to permit efficient queries by location.

Partitions are defined at table creation time‡ using the PARTITIONED BY clause, which takes a list of column definitions. For the hypothetical log files example, we might define a table with records comprising a timestamp and the log line itself:

When we load data into a partitioned table, the partition values are specified explicitly:

However, partitions may be added to or removed from a table after creation using an ALTER TABLE statement.

At the filesystem level, partitions are simply nested subdirectories of the table directory. After loading a few more files into the logs table, the directory structure might look like this:

The logs table has two date partitions, 2010-01-01 and 2010-01-02, corresponding to subdirectories called dt=2010-01-01 and dt=2010-01-02; and two country subpartitions, GB and US, corresponding to nested subdirectories called country=GB and country=US. The data files reside in the leaf directories.

We can ask Hive for the partitions in a table using SHOW PARTITIONS:

One thing to bear in mind is that the column definitions in the PARTITIONED BY clause are full-fledged table columns, called partition columns; however, the data files do not contain values for these columns since they are derived from the directory names.

You can use partition columns in SELECT statements in the usual way. Hive performs input pruning to scan only the relevant partitions. For example:

will only scan file1, file2, and file4. Notice, too, that the query returns the values of the dt partition column, which Hive reads from the directory names since they are not in the data files.

Buckets

There are two reasons why you might want to organize your tables (or partitions) into buckets. The first is to enable more efficient queries. Bucketing imposes extra structure on the table, which Hive can take advantage of when performing certain queries. In particular, a join of two tables that are bucketed on the same columns which include the join columns can be efficiently implemented as a map-side join.

The second reason to bucket a table is to make sampling more efficient. When working with large datasets, it is very convenient to try out queries on a fraction of your dataset while you are in the process of developing or refining them. We shall see how to do efficient sampling at this end of this section.

First, let’s see how to tell Hive that a table should be bucketed. We use the CLUSTERED BY clause to specify the columns to bucket on and the number of buckets:

Here we are using the user ID to determine the bucket (which Hive does by hashing the value and reducing modulo the number of buckets), so any particular bucket will effectively have a random set of users in it.In the map-side join case, where the two tables are bucketed in the same way, a mapper processing a bucket of the left table knows that the matching rows in the right table are in its corresponding bucket, so it need only retrieve that bucket (which is a small fraction of all the data stored in the right table) to effect the join. This optimization works, too, if the number of buckets in the two tables are multiples of each other they do nothave to have exactly the same number of buckets. The HiveQL for joining two bucketed tables is shown in “Map joins” .

The data within a bucket may additionally be sorted by one or more columns. This allows even more efficient map-side joins, since the join of each bucket becomes an efficient merge-sort. The syntax for declaring that a table has sorted buckets is:

How can we make sure the data in our table is bucketed? While it’s possible to load data generated outside Hive into a bucketed table, it’s often easier to get Hive to do the bucketing, usually from an existing table.

Hive does not check that the buckets in the data files on disk are consistent with the buckets in the table definition (either in number, or on the basis of bucketing columns). If there is a mismatch, then you may get an error or undefined behavior at query time. For this reason, it is advisable to get Hive to perform the bucketing.

Take an unbucketed users table:

To populate the bucketed table, we need to set the hive.enforce.bucketing property to true§, so that Hive knows to create the number of buckets declared in the table definition. Then it is a matter of just using the INSERT command:

Physically, each bucket is just a file in the table (or partition) directory. The file name is not important, but bucket n is the nth file, when arranged in lexicographic order. In fact, buckets correspond to MapReduce output file partitions: a job will produce as many buckets (output files) as reduce tasks. We can see this by looking at the layout of the bucketed_users table we just created. Running this command:

hive> dfs -ls /user/hive/warehouse/bucketed_users;

shows that four files were created, with the following names (the name is generated by Hive and incorporates a timestamp, so it will change from run to run):

The first bucket contains the users with IDs 0 and 4, since for an INT the hash is the integer itself, and the value is reduced modulo the number of buckets 4 in this case:

We can see the same thing by sampling the table using the TABLESAMPLE clause, which restricts the query to a fraction of the buckets in the table rather than the whole table:

Bucket numbering is 1-based, so this query retrieves all the users from the first of four buckets. For a large, evenly distributed dataset, approximately one quarter of the table’s rows would be returned. It’s possible to sample a number of buckets by specifying a different proportion (which need not be an exact multiple of the number of buckets, since sampling is not intended to be a precise operation). For example, this query returnshalf of the buckets:

From Hive 0.6.0. In previous versions, it was instead necessary to set mapred.reduce.tasks to the number of buckets in the table being populated. If the buckets are sorted, you also need to set hive.enforce.sorting to true.

The fields appear run together when displaying the raw file since the separator character in the output is a nonprinting control character. The control characters used are explained in the next section.

Sampling a bucketed table is very efficient, since the query only has to read the buckets that match the TABLESAMPLE clause. Contrast this with sampling a non-bucketed table, using the rand() function, where the whole input dataset is scanned, even if a very small sample is needed:

Storage Formats

There are two dimensions that govern table storage in Hive: the row format and the file format. The row format dictates how rows, and the fields in a particular row, are stored. In Hive parlance, the row format is defined by a SerDe, a portmanteau word for a Serializer-Deserializer.

When acting as a deserializer, which is the case when querying a table, a SerDe will deserialize a row of data from the bytes in the file to objects used internally by Hive to operate on that row of data. When used as a serializer, which is the case when performing an INSERT or CTAS (see “Importing Data” ), the table’s SerDe will serialize Hive’s internal representation of a row of data into the bytes that are written to the output file.

The file format dictates the container format for fields in a row. The simplest format is a plain text file, but there are row-oriented and column-oriented binary formats available, too.

The default storage format: Delimited text

When you create a table with no ROW FORMAT or STORED AS clauses, the default format is delimited text, with a row per line.

The default row delimiter is not a tab character, but the Control-A character from the set of ASCII control codes (it has ASCII code 1). The choice of Control-A, sometimes written as ^A in documentation, came about since it is less likely to be a part of the field text than a tab character. There is no means for escping delimiter characters in Hive, so it is important to choose ones that don’t occur in data fields.

The default collection item delimiter is a Control-B character, used to delimit items in an ARRAY or STRUCT, or key-value pairs in a MAP. The default map key delimiter is a Control-C character, used to delimit the key and value in a MAP. Rows in a table are delimited by a newline character.

The preceding description of delimiters is correct for the usual case of flat data structures, where the complex types only contain primitive types. For nested types, however, this isn’t the whole story, and in factthe level of the nesting determines the delimiter.

For an array of arrays, for example, the delimiters for the outer array are Control-B characters, as expected, but for the inner array they are Control-C characters, the next delimiter in the list. If you are unsurewhich delimiters Hive uses for a particular nested structure, you can run a command like:

then use hexdump, or similar, to examine the delimiters in the output file. Hive actually supports eight levels of delimiters, corresponding to ASCII codes 1, 2, ... 8, but you can only override the first three. Thus, the statement:

Notice that the octal form of the delimiter characters can be used 001 for Control-A, for instance.

Internally, Hive uses a SerDe called LazySimpleSerDe for this delimited format, along with the line-oriented MapReduce text input and output formats we saw in MapReduce Types And Formats Chapter . The “lazy” prefix comes about since it deserializes fields lazily only as they are accessed. However, it is not a compact format since fields are stored in a verbose textual format, so a boolean value, for instance, is written as the literal string true or false.

The simplicity of the format has a lot going for it, such as making it easy to process with other tools, including MapReduce programs or Streaming, but there are more compact and performant binary SerDe’s that you might consider using. Some are listed in Table .

Binary SerDe’s should not be used with the default TEXTFILE format (or explicitly using a STORED AS TEXTFILE clause). There is always the possibility that a binary row will contain a newline character, which would cause Hive to truncate the row and fail at deserialization time.

Hive-serde's

Binary storage formats: Sequence files and RCFiles

Hadoop’s sequence file format (“SequenceFile” ) is a general purpose binary format for sequences of records (key-value pairs). You can use sequence files in Hive by using the declaration STORED AS SEQUENCEFILE in the CREATE TABLE statement.

One of the main benefits of using sequence files is their support for splittable compression. If you have a collection of sequence files that were created outside Hive, then Hive will read them with no extra configuration. If, on the other hand, you want tables populated from Hive to use compressed sequence files for their storage, you need to set a few properties to enable compression (see “Using Compression in MapReduce” ):

Sequence files are row-oriented. What this means is that the fields in each row are stored together, as the contents of a single sequence file record.

Hive provides another binary storage format called RCFile, short for Record Columnar File. RCFiles are similar to sequence files, except that they store data in a columnoriented fashion. RCFile breaks up the table into row splits, then within each split stores the values for each row in the first column, followed by the values for each row in the second column, and so on. This is shown diagrammatically in Figure

Binary storage formats: Sequence files and RCFiles

A column-oriented layout permits columns that are not accessed in a query to be skipped. Consider a query of the table in Figure that processes only column 2. With row-oriented storage, like a sequence file, the whole row (stored in a sequence filerecord) is loaded into memory, even though only the second column is actually read.

Lazy deserialization goes some way to save processing cycles by only deserializing the columns fields that are accessed, but it can’t avoid the cost of reading each row’s bytes from disk.

With column-oriented storage, only the column 2 parts of the file (shaded in the figure) need to be read into memory.

In general, column-oriented formats work well when queries access only a small number of columns in the table. Conversely, row-oriented formats are appropriate when a large number of columns of a single row are needed for processing at the same time.

Space permitting, it is relatively straightforward to measure the performance difference between the two formats for your particular workload, since you can create a copy of a table with a different storage format for comparison, using “CREATE TABLE...AS SELECT” . Use the following CREATE TABLE clauses to enable column-oriented storage in Hive:

An example: RegexSerDe

Let’s see how to use another SerDe for storage. We’ll use a contrib SerDe that uses a regular expression for reading the fixed-width station metadata from a text file:

In previous examples, we have used the DELIMITED keyword to refer to delimited text in the ROW FORMAT clause. In this example, we instead specify a SerDe with the SERDE keyword and the fully qualified classname of the Java class that implements the SerDe, org.apache.hadoop.hive.contrib.serde2.RegexSerDe. SerDe’s can be configured with extra properties using the WITH SERDEPROPERTIES clause.

Here we set the input.regex property, which is specific to RegexSerDe. input.regex is the regular expression pattern to be used during deserialization to turn the line of text forming the row into a set of columns. Java regular expression syntax is used for the matching (see http://java.sun.com/javase/6/docs/api/java/util/regex/Pat tern.html), and columns are formed from capturing groups of parentheses. In this Sometimes you need to use parentheses for regular expression constructs that you don’t want to count as a capturing group. For example, the pattern (ab)+ for matching a string of one or more ab characters. The solution is to use a noncapturing group, which has a ? character after the first parenthesis. There are various noncapturing group constructs (see the Java documentation), but in this example we could use (?:ab)+ to avoid capturing the group as a Hive column.

example, there are three capturing groups for usaf (a six-digit identifier), wban (a fivedigit identifier), and name (a fixed-width column of 29 characters).

To populate the table, we use a LOAD DATA statement as before:

Recall that LOAD DATA copies or moves the files to Hive’s warehouse directory (in this case, it’s a copy since the source is the local filesystem). The table’s SerDe is not used for the load operation.

When we retrieve data from the table, the SerDe is invoked for deserialization, as we can see from this simple query, which correctly parses the fields for each row:

Importing Data

We’ve already seen how to use the LOAD DATA operation to import data into a Hive table (or partition) by copying or moving files to the table’s directory. You can also populate a table with data from another Hive table using an INSERT statement, or at creation time using the CTAS construct, which is an abbreviation used to refer to CREATE TABLE...AS SELECT.

If you want to import data from a relational database directly into Hive, have a look at Sqoop, which is covered in “Imported Data and Hive” . INSERT OVERWRITE TABLE Here’s an example of an INSERT statement:

For partitioned tables, you can specify the partition to insert into by supplying a PARTITION clause:

The OVERWRITE keyword is actually mandatory in both cases, and means that the contents of the target table (for the first example) or the 2010-01-01 partition (for the second example) are replaced by the results of the SELECT statement. At the time of writing, Hive does not support adding records to an already-populated nonpartitioned

table or partition using an INSERT statement. Instead, you can achieve the same effect using a LOAD DATA operation without the OVERWRITE keyword.

From Hive 0.6.0 onward, you can specify the partition dynamically, by determining the partition value from the SELECT statement:

This is known as a dynamic-partition insert. This feature is off by default, so you need to enable it by setting hive.exec.dynamic.partition to true first.

Unlike other databases, Hive does not (currently) support a form of the INSERT statement for inserting a collection of records specified in the query, in literal form. That is, statements of the form INSERT INTO...VAL UES... are not allowed.

Multitable insert

In HiveQL, you can turn the INSERT statement around and start with the FROM clause, for the same effect:

The reason for this syntax becomes clear when you see that it’s possible to have multiple INSERT clauses in the same query. This so-called multitable insert is more efficient than multiple INSERT statements, since the source table need only be scanned once to produce the multiple, disjoint outputs.

Here’s an example that computes various statistics over the weather dataset:

There is a single source table (records2), but three tables to hold the results from three different queries over the source.

CREATE TABLE...AS SELECT

It’s often very convenient to store the output of a Hive query in a new table, perhaps because it is too large to be dumped to the console or because there are further processing steps to carry out on the result.

The new table’s column definitions are derived from the columns retrieved by the SELECT clause. In the following query, the target table has two columns named col1 and col2 whose types are the same as the ones in the source table:

A CTAS operation is atomic, so if the SELECT query fails for some reason, then the table is not created.

Altering Tables

Since Hive uses the schema on read approach, it’s flexible in permitting a table’s definition to change after the table has been created. The general caveat, however, is that it is up to you, in many cases, to ensure that the data is changed to reflect the new structure.

You can rename a table using the ALTER TABLE statement:

In addition to updating the table metadata, ALTER TABLE moves the underlying table directory so that it reflects the new name. In the current example, /user/hive/warehouse/


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics