Support Questions
Find answers, ask questions, and share your expertise

Sqoop incremental import for teradata

Sqoop incremental import for teradata

Rising Star

How can we use incremental import in sqoop for Teradata with --connection-manager org.apache.sqoop.teradata.TeradataConnManager? it seems sqoop --incremental option is not working for TeradataConnManager. Below is the statement from Teradata connector user guide from Hortonworks.

9414-sqoop.png

Please help.

3 REPLIES 3
Highlighted

Re: Sqoop incremental import for teradata

Rising Star

Can anyone help ?

Highlighted

Re: Sqoop incremental import for teradata

Super Collaborator

More info is needed. If you post your command, perhaps someone can help. Also, you did not say what you mean by "not working".

Highlighted

Re: Sqoop incremental import for teradata

Rising Star

We can take below approach for incremental import in Hive or HDFS which is mentioned in Hortonworks document. In this approach we assume that each source table will have a unique single or multi-key identifier and a “modified_date” field is maintained for each record – either defined as part of the original source table or added as part of the ingest process. This approach is based on four main phase as describe below.

1. Ingest: Complete data movement from the operational database (base_table) followed by change or update of changed records only (incremental_table). (Step 1 to Step 4)

2. Reconcile: Create a single view of the base table and change records (reconcile_view) to reflect the latest record set. (Step 5)

3. Compact: Create a reporting table (reporting_table) from the reconciled view. (Step 6)

4. Purge: Replace the base table with the reporting table contents and delete any previously processed change records before the next data ingestion cycle, which is shown in the following diagram. (Step 7 and Step 8)

Below is the sqoop commands for implementation:

Step 1 :

sqoop import --connection-manager org.apache.sqoop.teradata.TeradataConnManager --connect jdbc:teradata://**.***.***.***/DATABASE=***** --username **** --password **** -table emp2 --target-dir /user/aps/poc/base_table -m 1

Step 2 :

CREATE TABLE base_table (emp_id string, emp_name string, mob_no string, create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION "/user/aps/poc/base_table";

Step 3 :

sqoop import --connection-manager org.apache.sqoop.teradata.TeradataConnManager --connect jdbc:teradata://**.***.***.***/DATABASE=***** --username **** --password **** --target-dir /user/aps/poc/incremental_table --query "select * from emp2 where emp_id > 12 AND \$CONDITIONS" -m 1

Step 4:

CREATE EXTERNAL TABLE incremental_table (emp_id string, emp_name string, mob_no string, create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION "/user/aps/poc/incremental_table";

Step 5 :

CREATE VIEW reconcile_view AS SELECT t1.* FROM (SELECT * FROM base_table UNION ALL SELECT * FROM incremental_table) t1 JOIN (SELECT emp_id, max(create_time) max_modified FROM (SELECT * FROM base_table UNION ALL SELECT * FROM incremental_table) t2 GROUP BY emp_id) s ON t1.emp_id = s.emp_id AND t1.create_time = s.max_modified;

Step 6 :

DROP TABLE reporting_table;

CREATE TABLE reporting_table AS SELECT * FROM reconcile_view;

Step 7 :

DROP TABLE base_table;

CREATE TABLE base_table AS SELECT * FROM reporting_table;

Step 8 :

hadoop fs -rm -r /user/aps/poc/incremental_table/*

The tables and views that will be a part of the Incremental Update Workflow are:

base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table .

incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content .

reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table .

reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.

We have to wait each step to complete for going to next step. We can maintain this using Unix shell or oozie. I have checked one case in the above example which is working fine. This approach is a bit complex compare to sqoop incremental option .