Support Questions

Find answers, ask questions, and share your expertise

Nifi how to sql join two flowfiles

avatar
New Contributor

Hi everyone

I have spent several days trying to solve the following case.

I need to join two flowfiles (master-detail) using Nifi flow, I tried using queryrecord and mergerecord but i couldn't did it.

I have some json based files in a HDFS (Apache Spark) i need to make a join and put de result in a database table (using nifi v1.11.4). I can read HDFS files but i cant' perform a join.

 

Example:

master_file content:

{

  "FALLA_ID" : 40217,
  "FALLA_FECHA" : "1998-01-01 00:00:00",
  "FALLA_RPT_NRO" : 25,
  "FALLA_FRECUENCIA" : "60.0000"

}

 

detail_file content:

{

"FALLA_ID" : 40217,
"PRINCIPAL" : 1,
"FALLA_DTL_ID" : 1,
"FALLA_CLASE" : 1,
"TPF_FALLA_ID" : 1

},

{

"FALLA_ID" : 40217,
"PRINCIPAL" : 1,
"FALLA_DTL_ID" : 2,
"FALLA_CLASE" : 2,
"TPF_FALLA_ID" : 5

}

 

Expected_File content:

{

  "FALLA_ID" : 40217,
  "FALLA_FECHA" : "1998-01-01 00:00:00",
  "FALLA_RPT_NRO" : 25,
  "FALLA_FRECUENCIA" : "60.0000",

  "PRINCIPAL" : 1,

  "FALLA_CLASE" : 2

}

 

As you can see, the FALLA_ID field is the common field to make a join.

 

Have you some ideas?

Thank you!!

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Hi @SirV ,

 

I see there are 2 possible options :

 

1. Merge two flow files based on common key ('FALLA_ID') using MergeContent processor :

  - Use EvaluateJsonPath first to get 'FALLA_ID' value to flow file attribute.

  - Use MergeContent processor to merge master-detail flow files, you need to use above step extracted         FALLA_ID value in 'Correlation Attribute Name' filed of MergeContent processor, so that it always merge flow files based on common FALL_ID value, so that you can get single merged file for each FALL_ID.

  - Use JOLTTransformJson to transform your merged json to desired format of output json.

 

 

2. Cache the first flow file content in to cache with key as 'FALLA_ID' value and merge when second flow file arrives :

  - Use NiFi DistributedMap Cache (or any other external cache like Ignite) to cache the first flow.
    (It will be key-value pair in cache, so use key as FALL_ID and value as whole flow file content.)
    Before caching the FF just check if that key is already present in cache, if already present means first (master/details) file has already arrived, so you can read that file and dont need to cache the current flow file.

  - Now you have 1 file in FF Content and a file in FF Attribute (which is read from cache), now you can use  ExceuteScript and write simple script (of your choice - python/groovy)  to club FF content and attributes to form a desired output json

Note : This cache approach has to be picked carefully based on your file volumes and content size etc else it may fill up your memory.
Also if you are in multi node cluster mode, NiFi DistributedCache will be independent for each node and does not interact with other nodes so if master file and details files get picked by different nodes then logic will fail !

 

Please ACCEPT if it helps/resolves your problem.

 

Thanks

Mahendra

View solution in original post

5 REPLIES 5

avatar
Master Collaborator

Hi @SirV ,

 

I see there are 2 possible options :

 

1. Merge two flow files based on common key ('FALLA_ID') using MergeContent processor :

  - Use EvaluateJsonPath first to get 'FALLA_ID' value to flow file attribute.

  - Use MergeContent processor to merge master-detail flow files, you need to use above step extracted         FALLA_ID value in 'Correlation Attribute Name' filed of MergeContent processor, so that it always merge flow files based on common FALL_ID value, so that you can get single merged file for each FALL_ID.

  - Use JOLTTransformJson to transform your merged json to desired format of output json.

 

 

2. Cache the first flow file content in to cache with key as 'FALLA_ID' value and merge when second flow file arrives :

  - Use NiFi DistributedMap Cache (or any other external cache like Ignite) to cache the first flow.
    (It will be key-value pair in cache, so use key as FALL_ID and value as whole flow file content.)
    Before caching the FF just check if that key is already present in cache, if already present means first (master/details) file has already arrived, so you can read that file and dont need to cache the current flow file.

  - Now you have 1 file in FF Content and a file in FF Attribute (which is read from cache), now you can use  ExceuteScript and write simple script (of your choice - python/groovy)  to club FF content and attributes to form a desired output json

Note : This cache approach has to be picked carefully based on your file volumes and content size etc else it may fill up your memory.
Also if you are in multi node cluster mode, NiFi DistributedCache will be independent for each node and does not interact with other nodes so if master file and details files get picked by different nodes then logic will fail !

 

Please ACCEPT if it helps/resolves your problem.

 

Thanks

Mahendra

avatar
New Contributor

Hi mahendra

Thank you for you answer

In effect i'm using a nifi cluster so the second option isn't viable.

I will work in your first approach and return back when i will finish, to write my results.

 

Regards.

avatar
New Contributor

Hi mahendra

I'm back, i did try your first approach and it helps me to solve my requirement.

 

Thank you again

Regards

avatar
New Contributor

Hi @SirV  can you please send me the code , m also stuck in the same situation.

 

avatar
Rising Star

Hi @hegdemahendra @SirV , I have a same scenario, and your point 1 resolution seems straight forward, but i didn't get in "MergeContent" processor. Your first dataEvaluateJsonPath is going to add in MergeContent with comman attribute "FILLA_ID" and detail_file content also going to MergeContent processor. but its not giving result with mapped field. sharing in my flow. Please suggest. 

samrathal_0-1661951596411.png