Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

sqoop incremental import working fine ,now i want know how to update value move into hive table

avatar
Expert Contributor

Mysql table

---------------------------

no | student name | dept

1 | siva | IT

2 | raj | cse

now i create sqoop incremental JOB data move into hive table (sqoop job --exec student_info)

hive table

-----------------

no | student name | dept

1 | siva | IT

2 | raj | cse

working fine .

now i update mysql Table Column value ( dept ) IT -> EEE IN ID 1

Mysql Table

---------------------

no | student name | dept

1 | siva | EEE

now i again run the sqoop increment import job (sqoop job --exec student_info)

IT Show that message

16/01/20 04:41:42 INFO tool.ImportTool: Incremental import based on column `id`

16/01/20 04:41:42 INFO tool.ImportTool: No new rows detected since last import.

[root@sandbox ~]

data not move into hive table

i want know how to move update value move into hive table (or) if not possible means how to move to NOSQL (HBASE) tabe

1 ACCEPTED SOLUTION

avatar
Master Guru
@sivasaravanakumar k

Attached is the full example, and here are the highlights. Table in Mysql defined below. For best results use timestamp as your date/time field. If you use just "date" like in your table you are ending up with low time granularity, so if you run the same job more than once a day it will import all new records updated that day.

create table st1(id int, name varchar(16), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

Populate the table with 5000 entries. Create and run a new Sqoop job writing into a hdfs directory, please adjust for hbase. I'm showing only the important output lines, see the attachment for full ouput (the "driver" option is required on the sandbox, you can ignore it, and I'm using only 1 mapper because my table is small):

[it1@sandbox ~]$ sqoop job --create incjob -- import --connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental lastmodified -check-column ts --target-dir sqin -m 1 --merge-key id
[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:27:59 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.3.2.0-2950
16/01/24 00:28:09 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:28:09 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:28:09.0'
16/01/24 00:28:31 INFO mapreduce.ImportJobBase: Retrieved 5000 records.
16/01/24 00:28:31 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:28:31 INFO tool.ImportTool: Updated data for job: incjob

The first time all 5000 entries are imported. Note that import tool sets the "Upper bound value" of ts to the current time when the command is executed. Now, change 200 entries, and run the same job again:

[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:35:59 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:35:59 INFO tool.ImportTool: Lower bound value: '2016-01-24 00:28:09.0'
16/01/24 00:35:59 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:35:59.0'
16/01/24 00:36:20 INFO mapreduce.ImportJobBase: Retrieved 200 records.
16/01/24 00:36:57 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:36:58 INFO tool.ImportTool: Updated data for job: incjob

Now only 200 entries are imported. Lower bound value is the one set the first time, and the Upper bound value is updated to the current time, and so the job is ready for the next run. That's all, happy sqooping!

View solution in original post

19 REPLIES 19

avatar
Master Guru

@sivasaravanakumar k Sorry, but if you want Sqoop to support descrubed functionality a time-stamp column is required. You can easily add it to you existing table by doing this in MySql:

ALTER TABLE student_info ADD ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
UPDATE TABLE student_info SET ts=now();

That's all! When you update values in your table, for example by "update student_info set ..." ts will be updated automatically. And Sqoop will use ts to import only updated rows. Please give it a try.

avatar
Rising Star

We will import updated row and already we imported that row in earlier import.so now we will have those 2 rows,how can we avoid this ?

avatar
Master Guru
@sivasaravanakumar k

Attached is the full example, and here are the highlights. Table in Mysql defined below. For best results use timestamp as your date/time field. If you use just "date" like in your table you are ending up with low time granularity, so if you run the same job more than once a day it will import all new records updated that day.

create table st1(id int, name varchar(16), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

Populate the table with 5000 entries. Create and run a new Sqoop job writing into a hdfs directory, please adjust for hbase. I'm showing only the important output lines, see the attachment for full ouput (the "driver" option is required on the sandbox, you can ignore it, and I'm using only 1 mapper because my table is small):

[it1@sandbox ~]$ sqoop job --create incjob -- import --connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental lastmodified -check-column ts --target-dir sqin -m 1 --merge-key id
[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:27:59 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.3.2.0-2950
16/01/24 00:28:09 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:28:09 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:28:09.0'
16/01/24 00:28:31 INFO mapreduce.ImportJobBase: Retrieved 5000 records.
16/01/24 00:28:31 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:28:31 INFO tool.ImportTool: Updated data for job: incjob

The first time all 5000 entries are imported. Note that import tool sets the "Upper bound value" of ts to the current time when the command is executed. Now, change 200 entries, and run the same job again:

[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:35:59 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:35:59 INFO tool.ImportTool: Lower bound value: '2016-01-24 00:28:09.0'
16/01/24 00:35:59 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:35:59.0'
16/01/24 00:36:20 INFO mapreduce.ImportJobBase: Retrieved 200 records.
16/01/24 00:36:57 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:36:58 INFO tool.ImportTool: Updated data for job: incjob

Now only 200 entries are imported. Lower bound value is the one set the first time, and the Upper bound value is updated to the current time, and so the job is ready for the next run. That's all, happy sqooping!

avatar
Expert Contributor

thanks @ Predrag Minovic

and also now i find solution please change append

sqoop job --create incjob --import--connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental append -check-column ts --target-dir sqin -m 1 --merge-key id --last-value 0

avatar
Master Guru

Hi @sivasaravanakumar k, for incremental append check-column will be 'id' and you keep on changing last-value for now appends.

avatar
New Contributor

@Predrag Minovic

Hi,

I was trying to do the incremental import using the incremental last modified and check column. last value stored in sqoop metastore was system timestamp instead of Max(Check column) value.

Example: Check column - timestamp field - Max value for that field in table - 2016-08-08 21:08:19.813

Initial load went fine without any issues. After initial load last value got updated to "2016-08-12 10:40:22.627" instead of "2016-08-08 21:08:19.813" due to this I am missing records inserted after 2016-08-08 21:08:19.813.

Thanks,

Thiru

,

Hi Predrag Minovic,

I was trying to do the Sqoop incremental import using the incremental last modified. last value get stored in sqoop metastore is system timestamp instead of Max(check_column).

Thanks,

Thiru

avatar
New Contributor

Hi @Thirupathi Rengasamy

Did you find solution for the issue which you faced? I am facing the same issue. Kindly reply.

@Predrag Minovic

Do you have any insights on this?

avatar
Master Guru

Hi guys, this is an old question, already resolved. If you have new issues, please post a new question. In your case you obviously have some issues with system time on your DB server versus system time on the node where your run sqoop.

avatar
New Contributor

Hi @Rahul Sounder

I am using incremental append for updates as well as new inserts and its working fine for me.

sqoop job -Dmapreduce.job.name=<job_name> --create <job_name> --meta-connect ${metaconnect_db} -- import --connect <db connection string> \--username <USER_NAME> \--password <PWD> \--query <Query WHERE \$CONDITIONS> \--as-textfile \--fields-terminated-by <DELIMITER_VALUE> \--target-dir <TGT_DR> \--m <MAPPER_CNT> \--null-string '\\N' \--null-non-string '\\N' \--check-column <check_column> \--incremental append \--hive-delims-replacement " " \--split-by <check_column> \--last-value <LAST VALUE>;

,

Hi @Rahul Sounder

I am using incremental append mode and its working for me.

sqoop job -Dmapreduce.job.name=<job_name> --create <job_name> --meta-connect ${metaconnect_db} -- import --connect <db connection string> \--username <USER_NAME> \--password <PWD> \--query <Query WHERE \$CONDITIONS> \--as-textfile \--fields-terminated-by <DELIMITER_VALUE> \--target-dir <TGT_DR> \--m <MAPPER_CNT> \--null-string '\\N' \--null-non-string '\\N' \--check-column <check_column> \--incremental append \--hive-delims-replacement " " \--split-by <check_column> \--last-value <LAST VALUE>;

avatar
Contributor

Hi - You need to write a shell script to accomplish this requirement.

Step 1. you need to find the Max value of your Delta/Incremental field value and assign this value to a variable.

2. In this Scenario, you need to use the Sqoop Import statement instead of Creation.

3. In the same script you, need to use the Sqoop Import with --query in where clause put the condition and read your variable.

This will solves your problem...