Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

sqoop incremental import working fine ,now i want know how to update value move into hive table

Contributor

Mysql table

---------------------------

no | student name | dept

1 | siva | IT

2 | raj | cse

now i create sqoop incremental JOB data move into hive table (sqoop job --exec student_info)

hive table

-----------------

no | student name | dept

1 | siva | IT

2 | raj | cse

working fine .

now i update mysql Table Column value ( dept ) IT -> EEE IN ID 1

Mysql Table

---------------------

no | student name | dept

1 | siva | EEE

now i again run the sqoop increment import job (sqoop job --exec student_info)

IT Show that message

16/01/20 04:41:42 INFO tool.ImportTool: Incremental import based on column `id`

16/01/20 04:41:42 INFO tool.ImportTool: No new rows detected since last import.

[root@sandbox ~]

data not move into hive table

i want know how to move update value move into hive table (or) if not possible means how to move to NOSQL (HBASE) tabe

1 ACCEPTED SOLUTION

@sivasaravanakumar k

Attached is the full example, and here are the highlights. Table in Mysql defined below. For best results use timestamp as your date/time field. If you use just "date" like in your table you are ending up with low time granularity, so if you run the same job more than once a day it will import all new records updated that day.

create table st1(id int, name varchar(16), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

Populate the table with 5000 entries. Create and run a new Sqoop job writing into a hdfs directory, please adjust for hbase. I'm showing only the important output lines, see the attachment for full ouput (the "driver" option is required on the sandbox, you can ignore it, and I'm using only 1 mapper because my table is small):

[it1@sandbox ~]$ sqoop job --create incjob -- import --connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental lastmodified -check-column ts --target-dir sqin -m 1 --merge-key id
[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:27:59 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.3.2.0-2950
16/01/24 00:28:09 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:28:09 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:28:09.0'
16/01/24 00:28:31 INFO mapreduce.ImportJobBase: Retrieved 5000 records.
16/01/24 00:28:31 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:28:31 INFO tool.ImportTool: Updated data for job: incjob

The first time all 5000 entries are imported. Note that import tool sets the "Upper bound value" of ts to the current time when the command is executed. Now, change 200 entries, and run the same job again:

[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:35:59 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:35:59 INFO tool.ImportTool: Lower bound value: '2016-01-24 00:28:09.0'
16/01/24 00:35:59 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:35:59.0'
16/01/24 00:36:20 INFO mapreduce.ImportJobBase: Retrieved 200 records.
16/01/24 00:36:57 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:36:58 INFO tool.ImportTool: Updated data for job: incjob

Now only 200 entries are imported. Lower bound value is the one set the first time, and the Upper bound value is updated to the current time, and so the job is ready for the next run. That's all, happy sqooping!

View solution in original post

19 REPLIES 19

Use --incremental lastmodified, and you need to add an extra column to your MySql table with a time-stamp, and whenever you update a row in MySql you need to update the time-stamp column as well. Let's call that new column ts, then you can create a new Sqoop job like this:

$ sqoop job --create student_info2 -- import --connect ... --incremental lastmodified --check-column ts

And run student_info2. If you run from the cmd line you can also specify "--last-value last-ts" telling sqoop to import only rows where ts>last-ts. When you use saved jobs Sqoop does that for you.

Contributor

thnks ,but i want how to with sqoop incremental import metadata,update column value into hive table

The above sqoop job will do that. Just add a new column to your MySql table like below. When you update your table ts will be updated automatically to the current time, and Sqoop will use ts to update only updated rows.

ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

Contributor

i dont have time stamp in my table ,i want know how do with without DATE format

Contributor

Now i am using time lastmodified in this case , full data move into the hbase , i want only move new record ,how can i do that

Only the first time when importing with --lastmodified all records will be imported, after that try to update a few records in MySql and run your Sqoop job again, it is supposed to import only updated records. If it still doesn't work please post your MySql table schema, a few records from your table and your Sqoop job command.

Hi @sivasaravanakumar k If you run sqoop from the command line, without Sqoop job, then you have to add --last-value, try for example to add "--last-value 2016-01-01", then only a few records where Date_Item is in 2016 will be imported. You can actually see that in the output of Sqoop, it gives you exact time when you ran Sqoop. So with --last-value '2016-01-23 13:48:02' nothing will be imported (if your MySql table is unchanged). If you create a new Sqoop job like your "student_info", then Sqoop will keep that date-time for you and you can just run the job again to import updated records.

Check the "Single view" demo/workshop repo we put together here that show examples of both bulk import and incremental update from Mysql to Hive using staging/final tables:

https://community.hortonworks.com/content/repo/10062/single-view-demo.html

The one you are looking for is the Mysql version, available here:

https://github.com/abajwa-hw/single-view-demo/blob/master/singleview-mysql-advanced-23.md

New Contributor

Hi, I'm getting the error "cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory" when I use the lastmodified incremental mode, where as I'm not getting the error for the append incremental mode. Why I'm getting the error when I use the lastmodified mode. How to fix this. Please suggest.

BR

Sivakumar

@sivasaravanakumar k Sorry, but if you want Sqoop to support descrubed functionality a time-stamp column is required. You can easily add it to you existing table by doing this in MySql:

ALTER TABLE student_info ADD ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
UPDATE TABLE student_info SET ts=now();

That's all! When you update values in your table, for example by "update student_info set ..." ts will be updated automatically. And Sqoop will use ts to import only updated rows. Please give it a try.

We will import updated row and already we imported that row in earlier import.so now we will have those 2 rows,how can we avoid this ?

@sivasaravanakumar k

Attached is the full example, and here are the highlights. Table in Mysql defined below. For best results use timestamp as your date/time field. If you use just "date" like in your table you are ending up with low time granularity, so if you run the same job more than once a day it will import all new records updated that day.

create table st1(id int, name varchar(16), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

Populate the table with 5000 entries. Create and run a new Sqoop job writing into a hdfs directory, please adjust for hbase. I'm showing only the important output lines, see the attachment for full ouput (the "driver" option is required on the sandbox, you can ignore it, and I'm using only 1 mapper because my table is small):

[it1@sandbox ~]$ sqoop job --create incjob -- import --connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental lastmodified -check-column ts --target-dir sqin -m 1 --merge-key id
[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:27:59 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.3.2.0-2950
16/01/24 00:28:09 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:28:09 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:28:09.0'
16/01/24 00:28:31 INFO mapreduce.ImportJobBase: Retrieved 5000 records.
16/01/24 00:28:31 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:28:31 INFO tool.ImportTool: Updated data for job: incjob

The first time all 5000 entries are imported. Note that import tool sets the "Upper bound value" of ts to the current time when the command is executed. Now, change 200 entries, and run the same job again:

[it1@sandbox ~]$ sqoop job --exec incjob
16/01/24 00:35:59 INFO tool.ImportTool: Incremental import based on column ts
16/01/24 00:35:59 INFO tool.ImportTool: Lower bound value: '2016-01-24 00:28:09.0'
16/01/24 00:35:59 INFO tool.ImportTool: Upper bound value: '2016-01-24 00:35:59.0'
16/01/24 00:36:20 INFO mapreduce.ImportJobBase: Retrieved 200 records.
16/01/24 00:36:57 INFO tool.ImportTool: Saving incremental import state to the metastore
16/01/24 00:36:58 INFO tool.ImportTool: Updated data for job: incjob

Now only 200 entries are imported. Lower bound value is the one set the first time, and the Upper bound value is updated to the current time, and so the job is ready for the next run. That's all, happy sqooping!

Contributor

thanks @ Predrag Minovic

and also now i find solution please change append

sqoop job --create incjob --import--connect jdbc:mysql://localhost:3306/test --driver com.mysql.jdbc.Driver --username it1 --password hadoop --table st1 --incremental append -check-column ts --target-dir sqin -m 1 --merge-key id --last-value 0

Hi @sivasaravanakumar k, for incremental append check-column will be 'id' and you keep on changing last-value for now appends.

New Contributor

@Predrag Minovic

Hi,

I was trying to do the incremental import using the incremental last modified and check column. last value stored in sqoop metastore was system timestamp instead of Max(Check column) value.

Example: Check column - timestamp field - Max value for that field in table - 2016-08-08 21:08:19.813

Initial load went fine without any issues. After initial load last value got updated to "2016-08-12 10:40:22.627" instead of "2016-08-08 21:08:19.813" due to this I am missing records inserted after 2016-08-08 21:08:19.813.

Thanks,

Thiru

,

Hi Predrag Minovic,

I was trying to do the Sqoop incremental import using the incremental last modified. last value get stored in sqoop metastore is system timestamp instead of Max(check_column).

Thanks,

Thiru

New Contributor

Hi @Thirupathi Rengasamy

Did you find solution for the issue which you faced? I am facing the same issue. Kindly reply.

@Predrag Minovic

Do you have any insights on this?

Hi guys, this is an old question, already resolved. If you have new issues, please post a new question. In your case you obviously have some issues with system time on your DB server versus system time on the node where your run sqoop.

New Contributor

Hi @Rahul Sounder

I am using incremental append for updates as well as new inserts and its working fine for me.

sqoop job -Dmapreduce.job.name=<job_name> --create <job_name> --meta-connect ${metaconnect_db} -- import --connect <db connection string> \--username <USER_NAME> \--password <PWD> \--query <Query WHERE \$CONDITIONS> \--as-textfile \--fields-terminated-by <DELIMITER_VALUE> \--target-dir <TGT_DR> \--m <MAPPER_CNT> \--null-string '\\N' \--null-non-string '\\N' \--check-column <check_column> \--incremental append \--hive-delims-replacement " " \--split-by <check_column> \--last-value <LAST VALUE>;

,

Hi @Rahul Sounder

I am using incremental append mode and its working for me.

sqoop job -Dmapreduce.job.name=<job_name> --create <job_name> --meta-connect ${metaconnect_db} -- import --connect <db connection string> \--username <USER_NAME> \--password <PWD> \--query <Query WHERE \$CONDITIONS> \--as-textfile \--fields-terminated-by <DELIMITER_VALUE> \--target-dir <TGT_DR> \--m <MAPPER_CNT> \--null-string '\\N' \--null-non-string '\\N' \--check-column <check_column> \--incremental append \--hive-delims-replacement " " \--split-by <check_column> \--last-value <LAST VALUE>;

Hi - You need to write a shell script to accomplish this requirement.

Step 1. you need to find the Max value of your Delta/Incremental field value and assign this value to a variable.

2. In this Scenario, you need to use the Sqoop Import statement instead of Creation.

3. In the same script you, need to use the Sqoop Import with --query in where clause put the condition and read your variable.

This will solves your problem...