Created 09-12-2024 06:20 AM
Hi there,
I am an absolute NIFI beginner, and I would like to implement the following data integration (DI) pipeline using Apache NIFI:
Scenario: I have two independent data sources:
Goal: I want to compare the records based on their ID columns from both sources and classify them as:
The output flow of this "diff" process should produce a stream where each line is enriched with a status flag ("new", "changed", "same", "deleted") indicating the result of the comparison.
Background: I have successfully implemented this functionality within the "Pentaho DI" platform for years. However, I am struggling to replicate it in Apache NIFI. While I assume a processor for this must exist, I haven't been able to find or configure it yet.
I would greatly appreciate any guidance or advice from the community. If anyone could point me in the right direction or suggest the appropriate NIFI components, that would be very helpful.
Thanks in advance!
Cheers,
Tomas.
Created 09-12-2024 10:29 AM
@moonspa Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @MattWho @SAMSAL who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created on 09-13-2024 06:34 AM - edited 09-13-2024 12:57 PM
Hi @moonspa ,
You might want to look into ForkEnrichment/JoinEnrichment processors. Lets assume that CSV has the latest information that needs to be merged into the Database, then using processor like GetFile\FetchFile you get the CSV file, then you use ForkEnrichment processor to fork the flow into two: originl (csv) and enrichment using enrichment relationship for the DB, then you connect the result from the DB query (using ExecuteSQLRecord for example) and the original to the JoinEnrichment processor where you decide what is the merging strategy. For example you can use SQL merge strategy so that you can link records using the ID with Full Join. This will give you the full list from both csv and the dB, however since this not real sql you cant use sql function like isNull or conditional statement to set the status but from there you can use QueryRecord processor where you can utilize SQL Calcite syntax to generate the status (new, same , deleted, changed) column.
Keep in mind as the documentation on the Fork\JoinEnrichment processor indicate that using SQL strategy with large amount of data might consume a lot of jvm heap which can lead to out of memory errors. To avoid this look into the section "More Complex Joining Strategies" toward the end to use a different approach with the Wrapper strategy alongside ScriptedTransformRecord processor.
Hope that helps. let me know if you have more questions.
If you found this is helpful please accept the solution.
Thanks
Created 09-17-2024 11:19 AM
@moonspa Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,