Created on 
    
	
		
		
		12-14-2016
	
		
		03:55 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
 - last edited on 
    
	
		
		
		10-02-2019
	
		
		09:16 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
 by 
				
		
		
			ask_bill_brooks
		
		
		
		
		
		
		
		
	
			
		
Hello guys,
I have the below NiFi flow, based on the thread
https://community.hortonworks.com/questions/70562/executesql-dynamic-query.html#comment-71349
The ExecuteSQL processor is executing very simple select query which returns the results very fast if executed from command line..
I dunno why I keep getting huge queue before the executeSQL, I tried to increase the running threads but still the output is slow...
any suggestion how to improve this ? bearing in mind that I have another process group which execute insert query and running smoothly, fast ?
Note the queue is full before ExectueSQL, I also got heap size errors when I increased the ExecuteSQL threads to 16 ?
I attached the flow as well
Thanks forum
Created on 12-14-2016 04:52 PM - edited 08-18-2019 05:42 AM
The approach in the other thread is very inefficient for this use case. You're basically trying to do a join between rows in a file and rows in a DB table. An alternative is to populate a DistributedMapCacheServer from the DB table, then look up those values in a separate flow.
To populate the map, you could do something like this:
Here I am using QueryDatabaseTable with a Max Value Column of "id" such that the map will only be populated once. But if you are adding entries to the lookup table (as it appears you might be from your description) or if new entries will not have strictly greater values for "id", then you can remove the Max Value Column property and schedule the QueryDatabaseTable processor to run as often as you'd like to refresh the values.
Once this flow is running, you can start a different flow that is similar to the one in the other thread, but instead of querying the DB for each row in the file, it will fetch from the DistributedCacheMapServer, which is hopefully faster:
You can see the first part is the same as the flow in the other thread, but instead of using ReplaceText to generate SQL to execute, the value is simply looked up from the Map and put into an attribute, then the final ReplaceText is like the one in the other thread, specifying "${column.1},${column.2},${column.3},${column.4}, ${customer.name}" or whatever the appropriate attributes are. I have attached a template (databaselookupexample.xml) showing these two flows.
Created on 12-14-2016 04:52 PM - edited 08-18-2019 05:42 AM
The approach in the other thread is very inefficient for this use case. You're basically trying to do a join between rows in a file and rows in a DB table. An alternative is to populate a DistributedMapCacheServer from the DB table, then look up those values in a separate flow.
To populate the map, you could do something like this:
Here I am using QueryDatabaseTable with a Max Value Column of "id" such that the map will only be populated once. But if you are adding entries to the lookup table (as it appears you might be from your description) or if new entries will not have strictly greater values for "id", then you can remove the Max Value Column property and schedule the QueryDatabaseTable processor to run as often as you'd like to refresh the values.
Once this flow is running, you can start a different flow that is similar to the one in the other thread, but instead of querying the DB for each row in the file, it will fetch from the DistributedCacheMapServer, which is hopefully faster:
You can see the first part is the same as the flow in the other thread, but instead of using ReplaceText to generate SQL to execute, the value is simply looked up from the Map and put into an attribute, then the final ReplaceText is like the one in the other thread, specifying "${column.1},${column.2},${column.3},${column.4}, ${customer.name}" or whatever the appropriate attributes are. I have attached a template (databaselookupexample.xml) showing these two flows.
Created 12-15-2016 11:16 AM
FetchDistrubutedMapCash is still not fast, though the table which was fetched is only 500 rows..
any suggestion.. ?
Also..
how can I reset the DistributedMapCacheServer content ?
I'm using the FetchDistributedMapCache to compare ${column.3} with DistributedMapCacheServer content, is it possible to use the same processor to compare ${column.4}, ?
Thank you @Matt Burgess
Created 10-02-2019 02:11 AM
 
					
				
				
			
		
