Created 07-05-2024 07:38 AM
Hi Team, I have been trying to generate a scenario like three separate modules to join through Fork Enrichment with Join Enrichment process.
For example: There are three processor groups A , B and C
My existing code designed like join the A and B modules with Fork Enrichment with Join Enrichment process by adding strategy SQL, and getting the expected result.
Now, latest requirement to join the Module C into SQL query with existing flow( SELECT a. id, b.name, c. city FROM A a JOIN B b ON a.id = b.id JOIN C c ON c.id = a.id)
Is it possible to implement one original and two enrichment flows with Fork Enrichment with Join Enrichment, or any alternative approach
Please advice.
Created 07-09-2024 03:33 PM
Hi @Kondaji ,
I could be wrong but I dont think you can utilize two enrichment from the same fork and expect to merge back to original. The join enrichment will take the first enrichment and merge it back to the original to do the enrichment based on the selected strategy. What you can do is utilize different fork-join for each enrichment: The first will be to enrich A with B and the second is to enrich the join of A&B with C. The below flow simulates this . I used GenerateFlowFile with single CSV record that has just the id for Group A , then I used two replacetext processors to simulate enrichment info with the same id from A.
Here are the processes configurations:
A (GenerateFlowFile):
B(ReplaceText):
JoinEnrichment - A&B:
C(ReplaceText):
JoinEnrichment- AB&C:
Final Result:
Hope that helps. If it does, please accept solution.
Thanks
Created 07-10-2024 07:58 AM
Hi,
Sorry to hear that its not working properly yet. I understand that its project specific but can you replicate the issue on some dummy data and post the configuration. This would help us troubleshoot better and find exactly what is going on.
Created 07-10-2024 12:25 PM
Its going to be hard to pinpoint what is going on without seeing configuration or providing clear instruction to replicate. When you say "query is not getting executed" , do you mean its stays in the queue or you get an error and the original flowfile gets penalized back ? One reason that its not getting executed is because you are not getting any data from the C processor\group therefore the fork original never gets merged back with anything. Make sure C is providing data to merge with in the final join enrichment
Created 07-11-2024 03:44 AM
Not sure why do you need to use the replace text processor in this case. I provided that in my sample above as an example anda a way to simulate getting new data by replacing the original content with something else. You can think of replaceText as if Im doing InvokeHttp and getting different flowfile content in the response releationship. If you got the data from module C just link directly to the join enrichment. As long as you have the correct writer\reader configured for each fork then you should be good.
Created 07-05-2024 10:28 AM
@Kondaji Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @MattWho @mburgess who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 07-09-2024 03:33 PM
Hi @Kondaji ,
I could be wrong but I dont think you can utilize two enrichment from the same fork and expect to merge back to original. The join enrichment will take the first enrichment and merge it back to the original to do the enrichment based on the selected strategy. What you can do is utilize different fork-join for each enrichment: The first will be to enrich A with B and the second is to enrich the join of A&B with C. The below flow simulates this . I used GenerateFlowFile with single CSV record that has just the id for Group A , then I used two replacetext processors to simulate enrichment info with the same id from A.
Here are the processes configurations:
A (GenerateFlowFile):
B(ReplaceText):
JoinEnrichment - A&B:
C(ReplaceText):
JoinEnrichment- AB&C:
Final Result:
Hope that helps. If it does, please accept solution.
Thanks
Created 07-10-2024 06:54 AM
@SAMSAL : Thank so much for your feasible approach.
I have applied the same way, however at AB&C JoinEnrichment processor is not getting executed. reviewed all input data parameters as well.
Sample query is not getting executed ( select * from original) inside AB&C JoinEnrichment. I couldn't able to share the flow details as it is project specific.
Can you please suggest!
Created 07-10-2024 07:58 AM
Hi,
Sorry to hear that its not working properly yet. I understand that its project specific but can you replicate the issue on some dummy data and post the configuration. This would help us troubleshoot better and find exactly what is going on.
Created 07-10-2024 11:11 AM
Thanks for your info. Can you please suggest, is there any way to pinpoint the issues in a JoinEnrichment processor.
Created 07-10-2024 12:25 PM
Its going to be hard to pinpoint what is going on without seeing configuration or providing clear instruction to replicate. When you say "query is not getting executed" , do you mean its stays in the queue or you get an error and the original flowfile gets penalized back ? One reason that its not getting executed is because you are not getting any data from the C processor\group therefore the fork original never gets merged back with anything. Make sure C is providing data to merge with in the final join enrichment
Created 07-11-2024 03:12 AM
@SAMSAL I have found the issue, i.e data coming from C module. When i copy the c module data to a replacetext processor, and it is executing fine. So inside the module C it is required to apply regular expression (?s)(^.*$) to queue data in replacetext processor as mentioned below.
Can you please suggest how to pass the queue data to Replacement Value in ReplaceText processor or any alternative approach.
Appreciate your support on this!
Created 07-11-2024 03:44 AM
Not sure why do you need to use the replace text processor in this case. I provided that in my sample above as an example anda a way to simulate getting new data by replacing the original content with something else. You can think of replaceText as if Im doing InvokeHttp and getting different flowfile content in the response releationship. If you got the data from module C just link directly to the join enrichment. As long as you have the correct writer\reader configured for each fork then you should be good.