Created 06-17-2020 11:43 PM
Hi @SirV ,
I see there are 2 possible options :
1. Merge two flow files based on common key ('FALLA_ID') using MergeContent processor :
- Use EvaluateJsonPath first to get 'FALLA_ID' value to flow file attribute.
- Use MergeContent processor to merge master-detail flow files, you need to use above step extracted FALLA_ID value in 'Correlation Attribute Name' filed of MergeContent processor, so that it always merge flow files based on common FALL_ID value, so that you can get single merged file for each FALL_ID.
- Use JOLTTransformJson to transform your merged json to desired format of output json.
2. Cache the first flow file content in to cache with key as 'FALLA_ID' value and merge when second flow file arrives :
- Use NiFi DistributedMap Cache (or any other external cache like Ignite) to cache the first flow.
(It will be key-value pair in cache, so use key as FALL_ID and value as whole flow file content.)
Before caching the FF just check if that key is already present in cache, if already present means first (master/details) file has already arrived, so you can read that file and dont need to cache the current flow file.
- Now you have 1 file in FF Content and a file in FF Attribute (which is read from cache), now you can use ExceuteScript and write simple script (of your choice - python/groovy) to club FF content and attributes to form a desired output json
Note : This cache approach has to be picked carefully based on your file volumes and content size etc else it may fill up your memory.
Also if you are in multi node cluster mode, NiFi DistributedCache will be independent for each node and does not interact with other nodes so if master file and details files get picked by different nodes then logic will fail !
Please ACCEPT if it helps/resolves your problem.
Thanks
Mahendra