Exports: A Deeper Look - Hadoop

The architecture of Sqoop’s export capability is very similar in nature to how Sqoop performs imports. (See Figure ) Before performing the export, Sqoop picks a strategy based on the database connect string. For most systems, Sqoop uses JDBC. Sqoop then generates a Java class based on the target table definition. This generated class has the ability to parse records from text files and insert values of the appropriate types into a table (in addition to the ability to read the columns from a ResultSet). A MapReduce job is then launched that reads the source data files from HDFS, parses the records using the generated class, and executes the chosen export strategy.

Exports: A Deeper Look

The JDBC-based export strategy builds up batch INSERT statements that will each add multiple records to the target table. Inserting many records per statement performs much better than executing many single-row INSERT statements on most database systems.Separate threads are used to read from HDFS and communicate with the database, to ensure that I/O operations involving different systems are overlapped as much as possible.

For MySQL, Sqoop can employ a direct-mode strategy using mysqlimport. Each map task spawns a mysqlimport process that it communicates with via a named FIFO on the local filesystem. Data is then streamed into mysqlimport via the FIFO channel, and from there into the database.

While most MapReduce jobs reading from HDFS pick the degree of parallelism (number of map tasks) based on the number and size of the files to process, Sqoop’s export system allows users explicit control over the number of tasks. The performance of the export can be affected by the number of parallel writers to the database, so Sqoop uses the CombineFileInputFormat class to group up the input files into a smaller number of map tasks.

Exports and Transactionality

Due to the parallel nature of the process, an export is often not an atomic operation. Sqoop will spawn multiple tasks to export slices of the data in parallel. These tasks can complete at different times, meaning that even though transactions are used inside tasks, results from one task may be visible before the results of another task. Moreover, databases often use fixed-size buffers to store transactions. As a result, one transactioncan not necessarily contain the entire set of operations performed by a task. Sqoop commits results every few thousand rows, to ensure that it does not run out of memory.

These intermediate results are visible while the export continues. Applications that will use the results of an export should not be started until the export process is complete, or they may see partial results.

More problematically, if tasks fail (due to network problems or other issues), they may attempt to restart their slice of the export operation from the beginning, inserting duplicate records. At the time of this writing, Sqoop does not guard against this potentiality.

Before launching an export job, constraints should be placed on the database table (for example, by designating a column as the primary key) to ensure uniqueness of rows. While future versions of Sqoop may use better recovery logic, this is not currently available.

Exports and SequenceFiles

The example export read source data from a Hive table, which is stored in HDFS as a delimited text file. Sqoop can also export delimited text files that were not Hive tables. For example, it can export text files that are the output of a MapReduce job.

Sqoop can also export records stored in SequenceFiles to an output table, although some restrictions apply. A SequenceFile can contain arbitrary record types. Sqoop’s export tool will read objects from SequenceFiles and send them directly to the Output Collector, which passes the objects to the database export OutputFormat. To work with Sqoop, the record must be stored in the “value” portion of the SequenceFile’s key-value pair format and must subclass the com.cloudera.sqoop.lib.SqoopRecord abstract class (as is done by all classes generated by Sqoop).

If you use the codegen tool (sqoop-codegen) to generate a SqoopRecord implementation for a record based on your export target table, you can then write a MapReduce program, which populates instances of this class and writes them to SequenceFiles. sqoopexport can then export these SequenceFiles to the table. Another means by which data may be in SqoopRecord instances in SequenceFiles is if data is imported from a database table to HDFS, modified in some fashion, and the results stored in SequenceFiles holding records of the same data type.

In this case, Sqoop should reuse the existing class definition to read data from SequenceFiles, rather than generate a new (temporary) record container class to perform the export, as is done when converting text-based records to database rows. You can suppress code generation and instead use an existing record class and jar by providing the --class-name and --jar-file arguments to Sqoop. Sqoop will use the specified class,loaded from the specified jar, when exporting records.

In the following example, we will re-import the widgets table as SequenceFiles, and then export it back to the database in a different table:

During the import, we specified the SequenceFile format, and that we wanted the jar file to be placed in the current directory (with --bindir), so we can reuse it. Otherwise, it would be placed in a temporary directory. We then created a destination table for the export, which had a slightly different schema, albeit one that is compatible with the original data. We then ran an export that used the existing generated code to read the records from the SequenceFile and write them to the database.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics