Support Questions

Find answers, ask questions, and share your expertise

Apache NiFi combination of GenerateTableFetch and ExecuteSQLRecord duplicates data

avatar

So I am using NiFi in a cluster mode, with 5 machines.
My Flow starts with an GenerateTableFetch, executed on the Primary Node only. As properties, I have set the Partition Size equal to 4000000. The connection is the standard one for an Oracle Database, and my table is a generic table with more then 50M rows.

Now, the downstream connection (success) from GenerateTableFetch is linked to an ExecuteSQLRecord. The connection is set with the Load Balance Strategy = Round Robin and Selected Prioritizers = FIFO.

Within the ExecuteSQLRecord, I have configured the following properties: Use Avro Logical Type = true, Max Rows Per Flow File = 4000000, Output Batch Size = 5, Set Auto Commit = true. When it comes to scheduling, I have 7 Concurrent Tasks and All nodes as Execution.

Once the data is converted into AVRO/Parquet (tried with both), I load them into a GCP Bucket/AWS S3. Afterwards, I load the data into their data warehouse service. The problem is that when I check my data, even though I have the same number of rows like in the Source Table, the data is not the same. I have plenty of duplicate rows and I managed to pin point the one causing the duplication of data: ExecuteSQLRecord. But I cannot figure why this is happening nor how I can solve it.

I have tried setting the ExecuteSQLRecord on debug to see exactly what happens. Based on the logs, even though I am using 7 tasks X 5 nodes, I am only seeing 8-10 log lines:

 

ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 72000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 56000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 16000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 56000000 ROWS FETCH NEXT 4000000 ROWS ONLY
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] Executing query SELECT * FROM ORCL.MY_TABLE_NAME WHERE 1=1 OFFSET 40000000 ROWS FETCH NEXT 4000000 ROWS ONLY

 

 However, once the ExecuteSQLRecord finishes the job, I see that all the files have been processed. Unfortunately, when creating an external table (GCP,AWS, no matter where) I have duplicate rows. (and no, I do not have duplicate rows in my source table)

 

ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_76000000_a1748d5f-f2dd-490c-b37c-daf54cc52391] contains 1133992 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_0_9a513a8f-ae5a-449d-b00c-6cf8a2ee7322] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_4000000_9c0fe0b3-7c12-4756-88e7-d16b099802d3] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_8000000_8e6909e5-3f13-41d4-b8cf-0429d49bf77b] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_12000000_222954b6-ca01-4c28-aa7b-3e02ef35892b] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_16000000_a1f51ef6-d592-4eb8-bbb9-d8adf5b37434] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_20000000_2334ca8b-91a6-4dcd-8adc-80dcc7163484] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_24000000_60e6f45b-41dd-4d0d-9853-5d471180d2e5] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_28000000_f6a23cf4-043b-4f6b-9025-7016bb5b2f28] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_32000000_33b53024-126f-402c-be88-86dcdf85dfe1] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_36000000_1572e725-8eff-4890-bdca-9c3c9f6b7215] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_40000000_5100fbb5-718b-447a-b279-304ac65a3249] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_44000000_056f47bf-1cbf-4b93-a2f6-a5300711df5e] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_52000000_e7f70f72-772a-4e69-b0d2-d037da1c4127] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_48000000_4d86844c-e56a-44d8-8b8c-cc025a4e9c83] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_56000000_75eb376e-1083-44d7-a781-db1b14340f64] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_60000000_b5b8f685-6af1-4f4a-8ecb-d47ec13099c8] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_68000000_968cad60-e64b-4ace-8382-32d10b5d96d4] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_64000000_cee6c80e-1c5c-4eba-a3b0-b3d40dc3d348] contains 4000000 records; transferring to 'success'
ExecuteSQLRecord[id=2b6f35be-d65c-19cd-9b43-96f87b96fad1] FlowFile[filename=4000000_72000000_7aeda32c-9dac-428d-bcc2-4dcd86f7c428] contains 4000000 records; transferring to 'success'

 

Has anybody else struggled with a similar problem? How did you manage to solve it?

Thanks 🙂

0 REPLIES 0