Support Questions

Find answers, ask questions, and share your expertise

Can NIFI QueryRecord Processor configure FlowFile Query parameters using data retrieved from external sources.

avatar
Contributor

I'm creating a NIFI flow to read CSV file data and load it an Relational database. I used QueryRecord Processor to read CSV/Convert-toJSON/ Filter Flow file data using some parameters. Everything works perfectly, data loaded to database based on the filter criteria I added to the flow Query. The flow Query looks like "SELECT * FROM FLOWFILE where VBEOID in (7000,7001,7003)". In this query, I have added only 3 filter parameters on its "IN" clause. But in real life situation, there will be 1000's of entries to be added to this filter criteria. I would like to read these values from a database table or call a REST service to get data, and substitute values in place of hard-coded the value. Is there a way I can do it?


queryrecord-processor.png
1 ACCEPTED SOLUTION

avatar
Master Guru
@Winnie Philip

Query Record processor dynamic properties allows expression language, so we can use Distributed Cache for this case.

Flow1:Load id's into DistributedCache:

Use PutDistributedMapCache server and load all your required id's into Cache.

Flow2: Fetch id's from DistributeCache:
In the original flow use FetchDistributedMapCache processor to fetch the cached data and keep them as attribute, use the same attribute name in your QueryRecord processor.

QueryRecord dynamic property query would be

SELECT * FROM FLOWFILE where VBEOID in (${fetch_distributed_cache_attribute_name})

You may have to change MaxCacheEntrysize property in PutDistributedMapCache and Max Length To Put In Attribute property in FetchDistributedMapCache processors as per the size data going to cache and retrieve from cache.

Sample Flow:

74550-flow.png

Refer to this link for configuring and usage of Put/FetchDistributedCache processors.

By using this method we are not hard coding the values in QueryRecord processor based on the attribute value from the FetchDistributedCache we are running query in QueryRecord processor dynamically.

View solution in original post

9 REPLIES 9

avatar
Master Guru
@Winnie Philip

Query Record processor dynamic properties allows expression language, so we can use Distributed Cache for this case.

Flow1:Load id's into DistributedCache:

Use PutDistributedMapCache server and load all your required id's into Cache.

Flow2: Fetch id's from DistributeCache:
In the original flow use FetchDistributedMapCache processor to fetch the cached data and keep them as attribute, use the same attribute name in your QueryRecord processor.

QueryRecord dynamic property query would be

SELECT * FROM FLOWFILE where VBEOID in (${fetch_distributed_cache_attribute_name})

You may have to change MaxCacheEntrysize property in PutDistributedMapCache and Max Length To Put In Attribute property in FetchDistributedMapCache processors as per the size data going to cache and retrieve from cache.

Sample Flow:

74550-flow.png

Refer to this link for configuring and usage of Put/FetchDistributedCache processors.

By using this method we are not hard coding the values in QueryRecord processor based on the attribute value from the FetchDistributedCache we are running query in QueryRecord processor dynamically.

avatar
Contributor

Thank you, Shu for your reply. I tried the method, but I'm finding an error. I'm not sure how to solve it.

I used QueryDatabaseTable to query a database table column and loaded data to PutDistributedMapCache. Then fetched cached data using "FetchDistributedMapCache" and then used loaded attribute in QueryRecord. QueryRecord is showing an error. The error is attached.


fetch-cache-database-update-flow.pngqueryrecord-processor-express.pngqueryrecord-processor-express-prop.pngfetch-distributed-map-cache.pngput-distributed-map-cache.png

avatar
Master Guru
@Winnie Philip

Could you please share the sample csv data and the data that you have loaded using PutDistributedMapCache?

avatar
Contributor

The input csv file used to for loading data to "PutdatabseRecord" is attached.(sample.txt). I executed a "QueryDatabaseTable" to get data for loading to "PutDistributedMapCache".


put-cache.pngquerydatabasetable.png

avatar
Master Guru
@Winnie Philip

After QueryDatabaseTable processor use Convert Record processor to read the incoming avro data with

Schema Access Strategy

Use Embedded Avro Schema

and Record writer as CsvSetWriter with the field that you want to keep in DistributedCacheMap processor and

**Record Separator** property value as

,

Now you are going to have flowfile content in CSV format with the field/s that you want to cache.

Feed the success relationship to PutDistributedMapCache processor.

Flow:

1.QueryDatabaseTable

2.ConvertRecord //read incoming avro data and write in Csv format(output needs to be in one line)

3.PutDistributedMapCache


replacetext.png

avatar
Contributor

Also, the flow works with "SELECT * FROM FLOWFILE where VBEOID in (7000,7001,7003)"

The flow shows error with "SELECT * FROM FLOWFILE where VBEOID in (${KBA_OID_cache})"

I think I'm doing some incorrect configurations in PutDistributeMapCache and / or FetchDistributedMapCache .

avatar
Master Guru

@Winnie Philip

Yep,the issue is with the avro format data that is going to PutDistributeMapCache processor.

As you have connected QueryDatabaseTable processor success to PutDistributeMapCache but QDT processor outputs flowfile contents in avro format.

In FetchDistributeCacheMap processor you are fetching the data into an attribute(avro format data) and using it in QueryRecord processor.

You have to change the format to CSV instead of avro that is going to PutDistributeCache and the flowfile content needs to be 7000,7001,7003.

Then PutDistributeCache processor will keep the attribute value as 7000,7001,7003 in csv format when you use QueryRecord processor you are not going to show any issues..!!

avatar
Contributor

Thanks Shu for your timely reply. I made it to work!

I used CSVRecordSetWriter property "Record Separator" as ",". With this change, cache data looked like "7000,7001,7003,".

Then after fetching data from cache, I had to use an expression to remove the trailing "," from filter query. It was giving me an error.

Finally working filter Flow query is:

SELECT * FROM FLOWFILE where VBEOID in (${KBA_OID_cache:substring(0,${KBA_OID_cache:lastIndexOf(',')})})

Is there any size limit of how many data I can keep this way in cache?

avatar
Master Guru
@Winnie Philip

if you are persisting some small datasets like tens of MB's should be good.If you are trying to cache some big dataset(couple of hundred MB's) then you need to increase the Max cache entry size in PutDistributedCacheMap processor.

In DistributedMapCacheServer service configure Persistence Directory property value, If the value specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only. By specifying directory we are not going use memory to cache the dataset.

Also you need to increase Maximum Cache Entries property in DistributedMapCacheServer according to number of cache entries you are trying to keep in cache.