Created 05-23-2018 07:52 PM
I'm creating a NIFI flow to read CSV file data and load it an Relational database. I used QueryRecord Processor to read CSV/Convert-toJSON/ Filter Flow file data using some parameters. Everything works perfectly, data loaded to database based on the filter criteria I added to the flow Query. The flow Query looks like "SELECT * FROM FLOWFILE where VBEOID in (7000,7001,7003)". In this query, I have added only 3 filter parameters on its "IN" clause. But in real life situation, there will be 1000's of entries to be added to this filter criteria. I would like to read these values from a database table or call a REST service to get data, and substitute values in place of hard-coded the value. Is there a way I can do it?
Created on 05-23-2018 10:27 PM - edited 08-17-2019 10:15 PM
Query Record processor dynamic properties allows expression language, so we can use Distributed Cache for this case.
Flow1:Load id's into DistributedCache:
Use PutDistributedMapCache server and load all your required id's into Cache.
Flow2: Fetch id's from DistributeCache:
In the original flow use FetchDistributedMapCache processor to fetch the cached data and keep them as attribute, use the same attribute name in your QueryRecord processor.
QueryRecord dynamic property query would be
SELECT * FROM FLOWFILE where VBEOID in (${fetch_distributed_cache_attribute_name})
You may have to change MaxCacheEntrysize property in PutDistributedMapCache and Max Length To Put In Attribute property in FetchDistributedMapCache processors as per the size data going to cache and retrieve from cache.
Sample Flow:
Refer to this link for configuring and usage of Put/FetchDistributedCache processors.
By using this method we are not hard coding the values in QueryRecord processor based on the attribute value from the FetchDistributedCache we are running query in QueryRecord processor dynamically.
Created on 05-23-2018 10:27 PM - edited 08-17-2019 10:15 PM
Query Record processor dynamic properties allows expression language, so we can use Distributed Cache for this case.
Flow1:Load id's into DistributedCache:
Use PutDistributedMapCache server and load all your required id's into Cache.
Flow2: Fetch id's from DistributeCache:
In the original flow use FetchDistributedMapCache processor to fetch the cached data and keep them as attribute, use the same attribute name in your QueryRecord processor.
QueryRecord dynamic property query would be
SELECT * FROM FLOWFILE where VBEOID in (${fetch_distributed_cache_attribute_name})
You may have to change MaxCacheEntrysize property in PutDistributedMapCache and Max Length To Put In Attribute property in FetchDistributedMapCache processors as per the size data going to cache and retrieve from cache.
Sample Flow:
Refer to this link for configuring and usage of Put/FetchDistributedCache processors.
By using this method we are not hard coding the values in QueryRecord processor based on the attribute value from the FetchDistributedCache we are running query in QueryRecord processor dynamically.
Created 06-02-2018 01:54 AM
Thank you, Shu for your reply. I tried the method, but I'm finding an error. I'm not sure how to solve it.
I used QueryDatabaseTable to query a database table column and loaded data to PutDistributedMapCache. Then fetched cached data using "FetchDistributedMapCache" and then used loaded attribute in QueryRecord. QueryRecord is showing an error. The error is attached.
Created 06-02-2018 02:12 AM
Could you please share the sample csv data and the data that you have loaded using PutDistributedMapCache?
Created 06-02-2018 03:14 AM
Created 06-02-2018 04:00 AM
After QueryDatabaseTable processor use Convert Record processor to read the incoming avro data with
Schema Access Strategy
Use Embedded Avro Schema
and Record writer as CsvSetWriter with the field that you want to keep in DistributedCacheMap processor and
**Record Separator** property value as
,
Now you are going to have flowfile content in CSV format with the field/s that you want to cache.
Feed the success relationship to PutDistributedMapCache processor.
Flow:
1.QueryDatabaseTable
2.ConvertRecord //read incoming avro data and write in Csv format(output needs to be in one line)
3.PutDistributedMapCache
Created 06-02-2018 03:20 AM
Also, the flow works with "SELECT * FROM FLOWFILE where VBEOID in (7000,7001,7003)"
The flow shows error with "SELECT * FROM FLOWFILE where VBEOID in (${KBA_OID_cache})"
I think I'm doing some incorrect configurations in PutDistributeMapCache and / or FetchDistributedMapCache .
Created 06-02-2018 03:30 AM
Yep,the issue is with the avro format data that is going to PutDistributeMapCache processor.
As you have connected QueryDatabaseTable processor success to PutDistributeMapCache but QDT processor outputs flowfile contents in avro format.
In FetchDistributeCacheMap processor you are fetching the data into an attribute(avro format data) and using it in QueryRecord processor.
You have to change the format to CSV instead of avro that is going to PutDistributeCache and the flowfile content needs to be 7000,7001,7003.
Then PutDistributeCache processor will keep the attribute value as 7000,7001,7003 in csv format when you use QueryRecord processor you are not going to show any issues..!!
Created 06-04-2018 09:03 PM
Thanks Shu for your timely reply. I made it to work!
I used CSVRecordSetWriter property "Record Separator" as ",". With this change, cache data looked like "7000,7001,7003,".
Then after fetching data from cache, I had to use an expression to remove the trailing "," from filter query. It was giving me an error.
Finally working filter Flow query is:
SELECT * FROM FLOWFILE where VBEOID in (${KBA_OID_cache:substring(0,${KBA_OID_cache:lastIndexOf(',')})})
Is there any size limit of how many data I can keep this way in cache?
Created 06-04-2018 10:12 PM
if you are persisting some small datasets like tens of MB's should be good.If you are trying to cache some big dataset(couple of hundred MB's) then you need to increase the Max cache entry size in PutDistributedCacheMap processor.
In DistributedMapCacheServer service configure Persistence Directory property value, If the value specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only. By specifying directory we are not going use memory to cache the dataset.
Also you need to increase Maximum Cache Entries property in DistributedMapCacheServer according to number of cache entries you are trying to keep in cache.