Created on 11-11-2016 09:23 AM - edited 08-18-2019 03:32 AM
How can we use incremental import in sqoop for Teradata with --connection-manager org.apache.sqoop.teradata.TeradataConnManager? it seems sqoop --incremental option is not working for TeradataConnManager. Below is the statement from Teradata connector user guide from Hortonworks.
Please help.
Created 11-17-2016 04:19 AM
Can anyone help ?
Created 11-17-2016 02:47 PM
More info is needed. If you post your command, perhaps someone can help. Also, you did not say what you mean by "not working".
Created 11-21-2016 01:46 PM
We can take below approach for incremental import in Hive or HDFS which is mentioned in Hortonworks document. In this approach we assume that each source table will have a unique single or multi-key identifier and a “modified_date” field is maintained for each record – either defined as part of the original source table or added as part of the ingest process. This approach is based on four main phase as describe below.
1. Ingest: Complete data movement from the operational database (base_table) followed by change or update of changed records only (incremental_table). (Step 1 to Step 4)
2. Reconcile: Create a single view of the base table and change records (reconcile_view) to reflect the latest record set. (Step 5)
3. Compact: Create a reporting table (reporting_table) from the reconciled view. (Step 6)
4. Purge: Replace the base table with the reporting table contents and delete any previously processed change records before the next data ingestion cycle, which is shown in the following diagram. (Step 7 and Step 8)
Below is the sqoop commands for implementation:
Step 1 :
sqoop import --connection-manager org.apache.sqoop.teradata.TeradataConnManager --connect jdbc:teradata://**.***.***.***/DATABASE=***** --username **** --password **** -table emp2 --target-dir /user/aps/poc/base_table -m 1
Step 2 :
CREATE TABLE base_table (emp_id string, emp_name string, mob_no string, create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION "/user/aps/poc/base_table";
Step 3 :
sqoop import --connection-manager org.apache.sqoop.teradata.TeradataConnManager --connect jdbc:teradata://**.***.***.***/DATABASE=***** --username **** --password **** --target-dir /user/aps/poc/incremental_table --query "select * from emp2 where emp_id > 12 AND \$CONDITIONS" -m 1
Step 4:
CREATE EXTERNAL TABLE incremental_table (emp_id string, emp_name string, mob_no string, create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION "/user/aps/poc/incremental_table";
Step 5 :
CREATE VIEW reconcile_view AS SELECT t1.* FROM (SELECT * FROM base_table UNION ALL SELECT * FROM incremental_table) t1 JOIN (SELECT emp_id, max(create_time) max_modified FROM (SELECT * FROM base_table UNION ALL SELECT * FROM incremental_table) t2 GROUP BY emp_id) s ON t1.emp_id = s.emp_id AND t1.create_time = s.max_modified;
Step 6 :
DROP TABLE reporting_table;
CREATE TABLE reporting_table AS SELECT * FROM reconcile_view;
Step 7 :
DROP TABLE base_table;
CREATE TABLE base_table AS SELECT * FROM reporting_table;
Step 8 :
hadoop fs -rm -r /user/aps/poc/incremental_table/*
The tables and views that will be a part of the Incremental Update Workflow are:
base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table .
incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content .
reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table .
reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.
We have to wait each step to complete for going to next step. We can maintain this using Unix shell or oozie. I have checked one case in the above example which is working fine. This approach is a bit complex compare to sqoop incremental option .