Support Questions

Find answers, ask questions, and share your expertise

Ingest data incrementally using Nifi from DB2

Explorer

Hi,

I have 3 columns , Userid , Serial no and Timestamp column(YYYY-DD-MM HH:MM:SS) in DB2 . I would like to create a incremental load using Nifi . userid,serialno are random numbers not increased primary numbers . TS is increamental.

I have tried to use

FLOW 1

QueryFetchTable->ExecuteSQL->PutHIveStreaming --> logattribute (to avoid duplicates)-- Not working , No Error in the logs (refer attached pic for configuration ) nf.pngnf1.pngnf2.png I'm not able to load even single data to hive using this method as I have used max_column is TS . -- Hive table is Partitioned , Bucketed , ORC and transantional

FLOW 2

ExecuteSQL->PutHiveStreaming->logattribute --> I'm able to manage the data into hive , but I will not be able to incremental load the data -- Hive table is Partitioned , Bucketed , ORC and transantional

Could you please help me to setup a Simple Incremental load using any flow files . Ultimately , I would like a incremental load without duplicates

Hive table is Partitioned , Bucketed , ORC and transnational

I'm open to any processors in Nifi for an incremental load without duplicates . Can you please create a sample workflow with sample config and sample data

@Shu , Any input please . Thanks a lot in advance . Expecting a awesome answer from you 🙂

11 REPLIES 11

Super Guru

Flow 1 is the right one, what errors are you seeing?

Explorer

@Matt Burgess Is that QueryDatabasetable or GeneratetableFetch. Let's say I have 100 records in my source table(DB2) . When I add one record in my source I should be getting 101 records ,instead i get around 201 records . I get duplicated records and keep on adding duplicates for every new record @Shu or @Matt Burgess Kindly assist . Im missing a small thing . Kindly clarify.nf11.png Refer attached for the flow .

Explorer

There was no error . But I can't see data in hive . Are my configuration for flow 1 is right ? Without Queryfetchtable I'm able to put data in hive . But can't incremental load to Hadoop . I'm using NiFi 1.2 version.

Explorer

@Matt Burgess Im using Maximum-value Columns as timestamp (YYYY-MM-DD hh:mm:ss) . Do you think Nifi Is unable to take the Maximum-value as a timestamp in the format as it is(YYYY-MM-DD hh:mm:ss)

Explorer

Outputof the ExecuteSQL flow : "TIMESTAMP" : "2018-06-21T19:07:27.000Z" and Hive Datatype is Timestamp and not accepting the format . Is it because Maximum-value Columns aren't working..

Super Guru

I think it is because for the version of Hive used by PutHiveStreaming, it uses a version of Avro that does not support logical types. Try setting "Use Avro logical types" to false in QueryDatabaseTable, I believe that will cause the timestamp to be a string in the outgoing Avro file, but might be able to be converted in PutHiveStreaming under the hood.

Explorer

I'm using GenerateFetchtable instead of QueryDatabaseTable . I'm getting error

"error during database query or conversion of records to avro" . 

How to resolve the above error and to Avoid duplicated records in my table.

Explorer

@Matt Burgess . I still cannot eliminate duplicates . I have changed Avro logical types to false and it allowed me to insert data into Hive with DUPLICATES . Lets assume . I have 100 records If I insert 1 record and I should be getting 101 records instead I get 201 records.Anything I'm missing right now .Kindly assist

Explorer

nififlow.png , Attached the working flow with configurations . Everything is working except duplicates . Kindly assist

Super Guru

Your ExecuteSQL appears to have a query specified in the "SQL select query" property. This means it will ignore the contents of the flow files, which are each SQL statements themselves. Clear the "SQL select query" value, and then ExecuteSQL will use the flow file contents to execute those statements, which should have the SQL for incremental fetches.

Explorer

from Generatetablefetch I'm able to get the flow file . Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this

select userid,timestamp from user11 where timestamp<='01-01-2018 12:00:00' order by timestamp limit 10000

And I have used the Nifi Expression language as per the https://community.hortonworks.com/articles/167733/using-generatetablefetch-against-db2.html and created a query like this

select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames}  fetch first ${generatetablefetch.limit} rows only
select userid,timestamp from user11 where timestamp >= '01-01-2018 12:00:00' order by timestamp limit 1000


but i'm getting 


select userid,timestamp from user11 where order by timestamp limit 1000

In the above example where condition is not taking the value. please refer the screenshots for my configuration80483-nififlow.png

I think I have made halfway through this and stuck here .What is missing in this .