SQOOP is a great tool for importing from RDBMS to Hadoop and back out again. Usually it’s everyones first tool they use in the rapidly expanding Hadoop ecosystem. I did and because I came from data warehousing so it sort of feels familiar.  SQOOP is easy to use and has many options. I’m not going to go into technical details of how SQOOP works and all the options, you can look that up, read the documentation and code yourself!

However, I have seen people use it incorrectly or make the wrong choice of options. For example I have seen people compare SQOOP run with one mapper using the generic jdbc against Teradata’s fast export locally and cry foul when it’s slower. Well the native implementation of a propriety databases dump tool is nearly always going to win here! Additionally SQOOP has the ability to load data as Avro and Parquet. This is good, use it and avoid TEXT, if you don’t know why, stop, get Googling and come back. Avro and Parquet are better performing, compress better and allow schema evolution. They are also compatible, Avro is the object model for Parquet (Thrift also supported). The latter is generally preferred for Impala as the I/O is reduced and columns can be skipped.

So with this in mind I decided to make a wrapper to SQOOP to try and enforce some best practices and I hit a couple of obstacles along the way. What I wanted to achieve was:

  1. Default the file format to Avro/Parquet.
  2. Use direct mode.
  3. Identify best split by columns.
  4. Shared jobs.
  5. Handle schema evolution in final target dataset managed by Kite SDK. For the first run no evolution takes place.

 

The first problem here is the direct mode. The direct modes tells SQOOP to use  specialised connectors for faster extraction of data but the they don’t support Avro or Parquet. So we need to dump it as Text instead. Since we have to use Text files we should set the compression codec to LZOP. This codec creates an index that allows Text files to be split. It’s also fast. So if you have to use Text files in Hadoop use this. Obviously this isn’t a hard a fast rule, other codecs are available but I’m not debating their merits here.

This leads to a second problem. One of the main benefits of Avro, is it’s ability to handle schema evolution. A common scenario is to handle changes in the schema of source systems. RDBMS change schemas as well, so what do you do if you have changes on a daily basis as I have had at previous employers? Do you use views that you have to maintain over time and require custom code to manage, merge and keep HIVE/Impala in sync? You’ll still have to rewrite your data especially with Text files. This is where Avro has the edge. You can read data from yesterday using todays schema provided they abide by Avro’s schema evolution rules. Since Parquet uses Avro as it’s object model we win on all counts.

So how do we handle this? I see two options.

  • Forget the direct mode. Use the jdbc for the source target database without direct mode and load the data directly as Avro or Parquet.
  • When loading as Parquet SQOOP will actually create a Kite SDK dataset for you in the target directory. So I prefer this using Parquet. More on Kite SDK later.
  • Use the direct mode and write Text files with LZOP. Convert the CSV to Parquet post SQOOP.

 

You can get the full code from my github.

Sqoop Wrapper

In addition to the requirements above I also wanted to automate as much as possible. Previously for SQOOP I auto generated SQOOP commands from a bash script and pulled multiple databases each night, but as time progresses and additional requirements come in scripting only gets you so far so I opted to call SQOOP programmatically from Scala. Why?

  • I also wanted the ability to register any potential inbound schemas and datasets to a central lineage/catalog service, for example Cloudera’s Navigator or Hortonworks Falcon. By creating a service where users can register an interest in a database I can preconfigure jobs and tell and catalog service what datasets the cluster is getting, from where and how. This a a hot topic in many large Financial institutions.
  • I want continuous integration and testing frameworks.
  • I want to handle schema evolution.
  • I want a UI on top to allow administration of jobs.

Extending SQOOP JobStorage

SQOOP has a built in metastore which uses HSQLDb. While I have nothing against HSQLDb I’ve never used it and since several Hadoop components are already backed by MySQL, such as HIVE, OOZIE, Sentry, Cloudera Manager, Ambari and Ranger why not piggyback on these! You could also use SQL Server or Oracle. Within many organisations backups and restore of these databases are already handled. One less thing to worry about.

To use MySQL as a metastore you can simply either update the sqoop-site.xml to point to your MySQL server or extend the JobStorage interface.

Since I wanted more control over the schema backing the metastore I implemented my own JobStorage, the interface SQOOP uses to persist to the metastore. This lets me still use SQOOP to handle updating the metastore after job completion for incremental imports. I split the tables into two, sqoop_jobs and sqoop_job_props. See JobMetaStorage.java for details. Also I use SlickDb to handle database access, it removes the free text queries. It also integrates better for the frontend I plan to build using Spray. You can still use the SQOOP CLI to interact with this implementation by adding the jar to your HADOOP_CLASSPATH and set sqoop.job.storage.implementations your class in the sqoop-site.xml.

IngestSqoop Class

I created a class to setup the SqoopOptions that I want to default to. This is straightforward. I simply parse the incoming argument and set my defaults. Nothing special but I always set the HIVE delimiters to make sure line delimiters are handled correctly. We don’t want HIVE thinking we have more lines than we actually do because of a carriage return in a source text field. Additionally I set how I want to handle nulls .i.e. what to set when the source is null. This lead me to an issue in the way SQOOP handles the Avro schema. For schema evolution you have to set the default value for a field, SQOOP doesn’t currently do this and also sets the schema as a UNION of db_type and null. Avro requires the first type in the UNION to match the default type. We can’t guess and put in defaults, for lets say an INT because this might infer some business logic.  I patched SQOOP to set the default and swap the types in the UNION (JIRA). See IngestSqoop.java for details.

Ingestor Class

The Ingestor class is a simple wrapper that parsers in the incoming arguments, creates a SqoopOption, creates or executes the jobs and triggers Kite SDK to handle the schema evolution. Here we could also add a call to any Schema Catalog or Inventory Service to notify it that schema and datasets are inbound to the cluster.

The current setup accepts 4 run types:

initialise

This run type initialises jobs for each table found in the target database. It attempts to select the best column to split on and defaults the import type to incremental based on the split. For MySQL an autoincrement column is looked for. On Netezza the distribution key is checked. If a suitable column is not found the job is tagged as disabled. There’s always room for improvement here,  I should probably also check primary keys and their types. It’s on a best effort basis for now.

To run the initialiser to pre-populate the jobs run sqoop-runner.sh <run_type> <db_type> <server> <database>. For example against a mysql run.

It’s important to note this is a best guess. You can always set the job type to be a full import if no suitable column is found or leave it up to SQOOP to determine the split by column. The problem with leaving it up to SQOOP is that it will look for a Primary Key but can only use it if it is made from only one column and what happens if it’s not splittable like a GUID….failure.

Initialiser Class

The purpose of the Initialiser class and the TargetDb helper, which is far from perfect is to preconfigure jobs and store them in them in the metastore for each database that is registered. It tries to guess at the best column to split by, for example, by looking at the distribution columns in Netezza.

To register a database add the connection details to the conf/application.conf or set them as environment variables.

create

This run type creates a job. It expects a “:” separated list of parameters in the form

db_type:server:database:tablename:split_by_col:num_mappers:check_col:last_val

The resulting job name will be db_type:server:database:tablename.

exec:job

This run type executes the job given as a parameter.

exec:database

This run type executes all enabled jobs for a database in parallel in batches which can be configured. This uses parallel collections to generate a list of batch of sequences of SQOOP jobs. (List[Seq[job1, job2…,job10], Seq[job11, job12,….job20]]) and executes them in parallel. A better approach here might be to use Futures that dip into a queue of SqoopJobs and have 10 Futures running in parallel. My approach could get blocked by a long SQOOP job in the first Sequence in the list.

The main methods doing the work are shown here. The execute_job methods executes the actual SQOOP job, if successful it merges the Avro schemas of the source and target dataset and runs a Crunch job to move the SQOOP’d data into the target dataset.

Schema Evolution

Now that we can create, store and execute SQOOP jobs to our liking we can look at how to handle schema evolution. We have to deal with two cases:

  1. Data arriving from SQOOP as Parquet.
  2. Data arriving from SQOOP as Text files.

For both cases we need to handle updating the schema in the target dataset with the source schema. Basically when merging we are adding the new fields to the schema. We never drop the fields even if they are dropped in the source. This allows us to see data as it was and just return null values for the new files.

Since the datasets are managed by Kite we can use SQOOPs builtin Avro Schema Generator to provide us with a schema for the table we are running SQOOP against. This can then be merged against the schema of the target dataset. Since SQOOP doesn’t set the default for Avro schemas I wrote this helper class to correct this as detailed earlier. Alternatively you can patch SQOOP with this JIRA.

DataRepo Class

Now that we have an Avro schema we need to merge it into the target Kite dataset. This is handled in the DataRepo class. My target dataset is a HIVE external table. I create the dataset if it doesn’t exists, extract it’s schema and update it with the merged source schema. For details on how Kite works check out their documentation.  The interesting line in the update_schema method is at line 57, it uses Kite SDK’s schemaUtil class to merge the target datasets schema with the schema coming from the source dataset.

The load_dataset method uses the Kite SDK API’s to trigger a CopyTask to move the SQOOP’d Parquet files to the target dataset. SQOOP creates a dataset behind the scenes and Kite takes care of the column mappings for us in the CopyTask!

If we are using SQOOP in direct mode we can use Kite’sCSVImportCommand to load the CSV’s import by SQOOP to the target dataset. Since SQOOP doesn’t insert headers into the CSV files we need to supply this to the CSVImportCommand via the “–headers” option. There is a gotcha here. If you use the MySQL direct option SQOOP calls MySQLDump. This will convert any null values to string NULL. Kite will try and parse this, if the data type is INT, for example, it won’t fly!

A nice feature of the CSVImportCommand which calls the TransformTaskand CopyTask behind the scenes is that you can pass it a Crunch function extending MapFn to perform data validation on inbound records.

Summary

Now if we run the exec:job action through the Ingestor class we will:

  1. Execute a SQOOP stored in our MySQL metastore which writes the results as Parquet into a Kite dataset. The target path is configurable in conf/application.conf
  2. Merge the Avro schema of the table with the target dataset allowing schema evolution.
  3. Use Kite SDK to move the SQOOP’d dataset contents to the target dataset.

 

This means we don’t have to write complex code to manage the HIVE/Impala DDLs and rewrite files. Kite SDK simplifies the whole process of schema evolution, managing datasets and providing APIs to load and convert data into the correct formats in HDFS! The less code I have to write the better!

The schema evolution part is not just relevant to SQOOP. It also applies to flat files. Kite SDK is worth investigating, it may already provide the necessary tools for you to simplify your ingestions patterns.

Share blog