Created 11-14-2023 03:42 AM
I have a source table with 200 million of rows, which needs to fetch to another system.
The other system offer a batch insert. Is there an NiFi example I can follow that will read a source table and split the data into to 20 chucks and make 20 batch inserts, concurrently?
Created 11-15-2023 11:06 AM
So if the goal is that at any given time the number of concurrent upload is always no more than 20, then ignore the last part in my response that talks about "Batch Processing at the group Level". All you need is basically to get the records from the DB as I have explained and on the processor that does the upload you can set the Concurrent Tasks to 20 with the consideration listed above. Also Since you have too many records I would also consider using GenerateTableFetch before ExecuteSQL which will help generate sql statement that will partition the data into different pages given that you have a column with Numeric value such as an id that is sequence and evenly distributed. for more info see:
This way you can setup a schedule on the GenerateTableFetch to generate the different partitioned queries so that the fetched data downstream is processed without causing backpressure or out of memory exceptions.
You can Accept the solution once all your questions are answered and the issue is resolved.
Created 11-14-2023 09:45 AM
Hi @Sipping1n0s ,
Can you provide more information on the other system? How do you batch insert 20 records? is this an API call or some database?
Thanks
Created on 11-14-2023 10:11 AM - edited 11-14-2023 10:12 AM
good questions.
The other system is liked elasticsearch/solr. it has api to allow multiple concurrent curl commands to bulk insert csv files.
Created 11-14-2023 11:55 AM
So Im not sure what are you going to use for inserting the Data into EleasticSearch/Slor. Its possible that you want to use concurrent curl api command with ExecuteStreamCommand processor. Alternatively, you can use InvokeHttp processor to construct the API call which comes equipped with a lot of options and parameters to make the API call much easier. Nifi also comes with out of the box processor to interact with ElasticSearch & Solr like: PutElasticsearchHttp, PutElasticsearchJson, PutSolrRecord,PutSolrContentStream....etc. This can be much easier to setup than using API. If you are looking to do multiple upload concurrently, regardless of what process you use, you can configure that on the processor itself by right click on the processor, select Configure , select the Scheduling tab and set the proper value in the "Concurrent Tasks" box, for example you can set 20:
Keep in mind the following when you set this value :
1- Single Node vs. Cluster: if you are using single node then you can set the value of Concurrent Tasks as much as needed , but if you have a cluster and you are using some Load Balancing before making the upload to the target system , then you are already doing some concurrency where a given record might be uploaded at the same time by different nodes and you have to take that in consideration. For example, if you have 5 node cluster and 20 record per batch then each node will take 4 records and in this case the Concurrent Tasks will have the value of 4.
2- Make sure the "Maximum Timer Driven Thread Count" is set to >= 20 to allow Nifi to process 20 tasks concurrently , by default on this value is set to 10. To get to this value click on the 3 bar icon on top right , select Controller Settings and the value should be under the first tab "General":
Recommendation on how to set this value:
When it comes to batching the data from the source database , you can use a processor like ExecuteSQL or ExecuteSQLRecord to fetch data out. in those processors configuration you need to setup the DB Connection Pool service to create the connection, for more information on this refer to :
https://www.youtube.com/watch?v=fuEosO24fgI
Also in the configuration you can specify that you want 20 max records per flowfile in the "Max Rows Per Flow File" so that you dont get all 2 million in one file which might take a lot of time and memory to process which can result in an error depending on your heap size.
ExcuteSQL will give you the result in Avro format. You can use ConvertAvroToJson processor if you want to convert to Json, or use ExecuteSQLRecord processor and set the RecordWriter to the desired target format.
If you want to ensure that you are processing 20 records at a time and you want to prevent any records from being processed before the older batch is complete , you can use the Batch Processing at the Group Level as described here:
https://www.youtube.com/watch?v=kvJx8vQnCNE&t=296s
So the idea is you put processing & uploading the 20 records in a group where you configure the "Process Group FlowFile Concurrency" as described in the video above. If your ExecuteSQL fetch 20 rows per flowfile then you allow one flowfile at a time to enter the group. Inside the group you need to split the records and upload concurrently.
Hopefully that will at least get you going. If you have any questions please let me know.
If you find this helpful please accept solution.
Thanks
Created on 11-15-2023 07:18 AM - edited 11-15-2023 07:33 AM
@SAMSAL , I am impress with the details. Thank you.
I am learning and not sure how some details will work, yet. For example, if I have plan on having 20 concurrent Put*Record and it will need to repeat for many times to cover 200 million rows in the table. When there are 19 of Puts, what is needed to do to start a new Put and the total Puts will not be over 20, concurrently?
Secondly, I have a one NiFi server. Can I have 20 Puts concurrently running? It would have been fine executing 20 curls from terminals from that one single NiFi Server.
On a different topic, if I Accept, how can I follow up with questions?
Created 11-15-2023 11:06 AM
So if the goal is that at any given time the number of concurrent upload is always no more than 20, then ignore the last part in my response that talks about "Batch Processing at the group Level". All you need is basically to get the records from the DB as I have explained and on the processor that does the upload you can set the Concurrent Tasks to 20 with the consideration listed above. Also Since you have too many records I would also consider using GenerateTableFetch before ExecuteSQL which will help generate sql statement that will partition the data into different pages given that you have a column with Numeric value such as an id that is sequence and evenly distributed. for more info see:
This way you can setup a schedule on the GenerateTableFetch to generate the different partitioned queries so that the fetched data downstream is processed without causing backpressure or out of memory exceptions.
You can Accept the solution once all your questions are answered and the issue is resolved.
Created 11-16-2023 06:26 PM
Thank you, I am thankful for all of your tips and hints. I'm going to accept this as a solution. I'll create a new ones, as needed.
Again, thank you.