Created 07-11-2018 03:44 PM
Hi,
I have 3 columns , Userid , Serial no and Timestamp column(YYYY-DD-MM HH:MM:SS) in DB2 . I would like to create a incremental load using Nifi . userid,serialno are random numbers not increased primary numbers . TS is increamental.
I have tried to use
FLOW 1
QueryFetchTable->ExecuteSQL->PutHIveStreaming --> logattribute (to avoid duplicates)-- Not working , No Error in the logs (refer attached pic for configuration ) nf.pngnf1.pngnf2.png I'm not able to load even single data to hive using this method as I have used max_column is TS . -- Hive table is Partitioned , Bucketed , ORC and transantional
FLOW 2
ExecuteSQL->PutHiveStreaming->logattribute --> I'm able to manage the data into hive , but I will not be able to incremental load the data -- Hive table is Partitioned , Bucketed , ORC and transantional
Could you please help me to setup a Simple Incremental load using any flow files . Ultimately , I would like a incremental load without duplicates
Hive table is Partitioned , Bucketed , ORC and transnational
I'm open to any processors in Nifi for an incremental load without duplicates . Can you please create a sample workflow with sample config and sample data
@Shu , Any input please . Thanks a lot in advance . Expecting a awesome answer from you 🙂
Created 07-12-2018 02:04 AM
Flow 1 is the right one, what errors are you seeing?
Created 07-12-2018 11:50 AM
@Matt Burgess Is that QueryDatabasetable or GeneratetableFetch. Let's say I have 100 records in my source table(DB2) . When I add one record in my source I should be getting 101 records ,instead i get around 201 records . I get duplicated records and keep on adding duplicates for every new record @Shu or @Matt Burgess Kindly assist . Im missing a small thing . Kindly clarify.nf11.png Refer attached for the flow .
Created 07-12-2018 02:36 AM
There was no error . But I can't see data in hive . Are my configuration for flow 1 is right ? Without Queryfetchtable I'm able to put data in hive . But can't incremental load to Hadoop . I'm using NiFi 1.2 version.
Created 07-12-2018 01:42 PM
@Matt Burgess Im using Maximum-value Columns as timestamp (YYYY-MM-DD hh:mm:ss) . Do you think Nifi Is unable to take the Maximum-value as a timestamp in the format as it is(YYYY-MM-DD hh:mm:ss)
Created 07-12-2018 01:55 PM
Outputof the ExecuteSQL flow : "TIMESTAMP" : "2018-06-21T19:07:27.000Z" and Hive Datatype is Timestamp and not accepting the format . Is it because Maximum-value Columns aren't working..
Created 07-12-2018 02:34 PM
I think it is because for the version of Hive used by PutHiveStreaming, it uses a version of Avro that does not support logical types. Try setting "Use Avro logical types" to false in QueryDatabaseTable, I believe that will cause the timestamp to be a string in the outgoing Avro file, but might be able to be converted in PutHiveStreaming under the hood.
Created 07-12-2018 05:38 PM
I'm using GenerateFetchtable instead of QueryDatabaseTable . I'm getting error
"error during database query or conversion of records to avro" .
How to resolve the above error and to Avoid duplicated records in my table.
Created 07-12-2018 04:39 PM
@Matt Burgess . I still cannot eliminate duplicates . I have changed Avro logical types to false and it allowed me to insert data into Hive with DUPLICATES . Lets assume . I have 100 records If I insert 1 record and I should be getting 101 records instead I get 201 records.Anything I'm missing right now .Kindly assist
Created 07-12-2018 04:52 PM
nififlow.png , Attached the working flow with configurations . Everything is working except duplicates . Kindly assist
Created 07-12-2018 11:36 PM
Your ExecuteSQL appears to have a query specified in the "SQL select query" property. This means it will ignore the contents of the flow files, which are each SQL statements themselves. Clear the "SQL select query" value, and then ExecuteSQL will use the flow file contents to execute those statements, which should have the SQL for incremental fetches.
Created 07-13-2018 02:16 PM
from Generatetablefetch I'm able to get the flow file . Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this
select userid,timestamp from user11 where timestamp<='01-01-2018 12:00:00' order by timestamp limit 10000
And I have used the Nifi Expression language as per the https://community.hortonworks.com/articles/167733/using-generatetablefetch-against-db2.html and created a query like this
select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames} fetch first ${generatetablefetch.limit} rows only
select userid,timestamp from user11 where timestamp >= '01-01-2018 12:00:00' order by timestamp limit 1000 but i'm getting select userid,timestamp from user11 where order by timestamp limit 1000
In the above example where condition is not taking the value. please refer the screenshots for my configuration80483-nififlow.png
I think I have made halfway through this and stuck here .What is missing in this .