Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache NiFi combination of GenerateTableFetch and ExecuteSQLRecord duplicates data

avatar

So I am using NiFi in a cluster mode, with 5 machines.
My Flow starts with an GenerateTableFetch, executed on the Primary Node only. As properties, I have set the Partition Size equal to 4000000. The connection is the standard one for an Oracle Database, and my table is a generic table with more then 50M rows.

Now, the downstream connection (success) from GenerateTableFetch is linked to an ExecuteSQLRecord. The connection is set with the Load Balance Strategy = Round Robin and Selected Prioritizers = FIFO.

Within the ExecuteSQLRecord, I have configured the following properties: Use Avro Logical Type = true, Max Rows Per Flow File = 4000000, Output Batch Size = 5, Set Auto Commit = true. When it comes to scheduling, I have 7 Concurrent Tasks and All nodes as Execution.

Once the data is converted into AVRO/Parquet (tried with both), I load them into a GCP Bucket/AWS S3. Afterwards, I load the data into their data warehouse service. The problem is that when I check my data, even though I have the same number of rows like in the Source Table, the data is not the same. I have plenty of duplicate rows and I managed to pin point the one causing the duplication of data: ExecuteSQLRecord. But I cannot figure why this is happening nor how I can solve it.

I have tried setting the ExecuteSQLRecord on debug to see exactly what happens. Based on the logs, even though I am using 7 tasks X 5 nodes, I am only seeing 8-10 log lines:

 

ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 72000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 56000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 16000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 56000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY

 

 However, once the ExecuteSQLRecord finishes the job, I see that all the files have been processed. Unfortunately, when creating an external table (GCP,AWS, no matter where) I have duplicate rows. (and no, I do not have duplicate rows in my source table)

 

ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_76000000_a1748d5f-f2dd-490c-b37c-daf54cc52391] contains 1133992 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_0_9a513a8f-ae5a-449d-b00c-6cf8a2ee7322] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_4000000_9c0fe0b3-7c12-4756-88e7-d16b099802d3] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_8000000_8e6909e5-3f13-41d4-b8cf-0429d49bf77b] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_12000000_222954b6-ca01-4c28-aa7b-3e02ef35892b] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_16000000_a1f51ef6-d592-4eb8-bbb9-d8adf5b37434] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_20000000_2334ca8b-91a6-4dcd-8adc-80dcc7163484] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_24000000_60e6f45b-41dd-4d0d-9853-5d471180d2e5] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_28000000_f6a23cf4-043b-4f6b-9025-7016bb5b2f28] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_32000000_33b53024-126f-402c-be88-86dcdf85dfe1] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_36000000_1572e725-8eff-4890-bdca-9c3c9f6b7215] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_40000000_5100fbb5-718b-447a-b279-304ac65a3249] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_44000000_056f47bf-1cbf-4b93-a2f6-a5300711df5e] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_52000000_e7f70f72-772a-4e69-b0d2-d037da1c4127] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_48000000_4d86844c-e56a-44d8-8b8c-cc025a4e9c83] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_56000000_75eb376e-1083-44d7-a781-db1b14340f64] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_60000000_b5b8f685-6af1-4f4a-8ecb-d47ec13099c8] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_68000000_968cad60-e64b-4ace-8382-32d10b5d96d4] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_64000000_cee6c80e-1c5c-4eba-a3b0-b3d40dc3d348] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_72000000_7aeda32c-9dac-428d-bcc2-4dcd86f7c428] contains 4000000 records; transferring to 'success'

 

Has anybody else struggled with a similar problem? How did you manage to solve it?

Thanks 🙂

0 REPLIES 0