Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to join three data flow files through JoinEnrichment

avatar
Explorer

Hi Team, I have been trying to generate a scenario like three separate modules to join through Fork Enrichment with Join Enrichment process.

For example:  There are three processor groups A , B and C

My existing code designed like join the A and B modules with Fork Enrichment with Join Enrichment process by adding strategy SQL, and getting the expected result.

Now, latest requirement to join the Module C into SQL query with existing flow( SELECT a. id, b.name, c. city FROM A a JOIN B b ON a.id = b.id JOIN C c ON c.id = a.id)


Is it possible to implement one original and two enrichment flows with Fork Enrichment with Join Enrichment, or any alternative approach

Please advice.

  

4 ACCEPTED SOLUTIONS

avatar
Super Guru

Hi @Kondaji ,

I could be wrong but I dont think you can utilize two enrichment from the same fork and expect to merge back to original. The join enrichment will take the first enrichment and merge it back to the original to do the enrichment based on the selected strategy. What you can do is utilize different fork-join for each enrichment: The first will be to enrich A with B and the second is to enrich the join of A&B with C. The below flow simulates this . I used GenerateFlowFile with single CSV record that has just the id for Group A , then I used two replacetext processors to simulate enrichment info with the same id from A.

SAMSAL_0-1720564097360.png

Here are the processes configurations:

A (GenerateFlowFile):

SAMSAL_1-1720564150688.png

B(ReplaceText):

SAMSAL_2-1720564198517.png

 

JoinEnrichment - A&B:

SAMSAL_3-1720564236512.png

C(ReplaceText):

SAMSAL_4-1720564274894.png

JoinEnrichment- AB&C:

SAMSAL_5-1720564324614.png

 

Final Result:

SAMSAL_6-1720564367010.png

Hope that helps. If it does, please accept solution.

Thanks

 

 

 

 

 

 

 

View solution in original post

avatar
Super Guru

Hi,

Sorry to hear that its not working properly yet. I understand that its project specific but can you replicate the issue on some dummy data and post the configuration. This would help us troubleshoot better and find exactly what is going on.

View solution in original post

avatar
Super Guru

Its going to be hard to pinpoint what is going on without seeing configuration or providing clear instruction to replicate. When you say "query is not getting executed" , do you mean its stays in the queue or you get an error and the original flowfile gets penalized back ? One reason that its not getting executed is because you are not getting any data from the C processor\group therefore the fork original never gets merged back with anything. Make sure C is providing data to merge with in the final join enrichment

View solution in original post

avatar
Super Guru

Not sure why do you need to use the replace text processor in this case. I provided that in my sample above as an example anda a way to simulate getting new data by replacing the original content with something else. You can think of replaceText as if Im doing InvokeHttp and getting different flowfile content in the response releationship. If you got the data from module C just link directly to the join enrichment. As long as you have the correct writer\reader configured for each fork then you should be good.

View solution in original post

8 REPLIES 8

avatar
Community Manager

@Kondaji Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @mburgess  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Super Guru

Hi @Kondaji ,

I could be wrong but I dont think you can utilize two enrichment from the same fork and expect to merge back to original. The join enrichment will take the first enrichment and merge it back to the original to do the enrichment based on the selected strategy. What you can do is utilize different fork-join for each enrichment: The first will be to enrich A with B and the second is to enrich the join of A&B with C. The below flow simulates this . I used GenerateFlowFile with single CSV record that has just the id for Group A , then I used two replacetext processors to simulate enrichment info with the same id from A.

SAMSAL_0-1720564097360.png

Here are the processes configurations:

A (GenerateFlowFile):

SAMSAL_1-1720564150688.png

B(ReplaceText):

SAMSAL_2-1720564198517.png

 

JoinEnrichment - A&B:

SAMSAL_3-1720564236512.png

C(ReplaceText):

SAMSAL_4-1720564274894.png

JoinEnrichment- AB&C:

SAMSAL_5-1720564324614.png

 

Final Result:

SAMSAL_6-1720564367010.png

Hope that helps. If it does, please accept solution.

Thanks

 

 

 

 

 

 

 

avatar
Explorer

@SAMSAL : Thank so much for your feasible approach. 

I have applied the same way, however at AB&C JoinEnrichment processor is not getting executed.  reviewed all input data parameters as well. 
Sample query is not getting executed ( select * from original) inside AB&C JoinEnrichment. I couldn't able to share the flow details as it is project specific. 

Can you please suggest!

avatar
Super Guru

Hi,

Sorry to hear that its not working properly yet. I understand that its project specific but can you replicate the issue on some dummy data and post the configuration. This would help us troubleshoot better and find exactly what is going on.

avatar
Explorer

Thanks for your info. Can you please suggest, is there any way to pinpoint the issues in a JoinEnrichment processor.

avatar
Super Guru

Its going to be hard to pinpoint what is going on without seeing configuration or providing clear instruction to replicate. When you say "query is not getting executed" , do you mean its stays in the queue or you get an error and the original flowfile gets penalized back ? One reason that its not getting executed is because you are not getting any data from the C processor\group therefore the fork original never gets merged back with anything. Make sure C is providing data to merge with in the final join enrichment

avatar
Explorer

@SAMSAL  I have found the issue, i.e data coming from C module. When i copy the c module data to a replacetext processor, and it is executing fine. So inside the module C it is required to apply regular expression (?s)(^.*$) to queue data in replacetext processor as mentioned below.

Can you please suggest how to pass the queue data to Replacement Value in ReplaceText processor or any alternative approach.

Appreciate your support on this!

Kondaji_0-1720692514750.png

 


  

avatar
Super Guru

Not sure why do you need to use the replace text processor in this case. I provided that in my sample above as an example anda a way to simulate getting new data by replacing the original content with something else. You can think of replaceText as if Im doing InvokeHttp and getting different flowfile content in the response releationship. If you got the data from module C just link directly to the join enrichment. As long as you have the correct writer\reader configured for each fork then you should be good.