Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache NIFI: Merge of two lines from two sources with diff identification

avatar
New Contributor

Hi there,

I am an absolute NIFI beginner, and I would like to implement the following data integration (DI) pipeline using Apache NIFI:

Scenario: I have two independent data sources:

  1. A CSV file.
  2. A database.
    Both contain streams of sorted lines with the same column structure. Each record, except of other columns, also includes:
  • A primary ID.
  • A calculated hash, generated from the content of the line.

Goal: I want to compare the records based on their ID columns from both sources and classify them as:

  • New: The ID is present in the CSV file but not in the database.
  • Changed: The ID exists in both sources, but the hash values differ.
  • Same: The ID exists in both sources, and the hash values are the same.
  • Deleted: The ID is missing from the CSV file but exists in the database.

The output flow of this "diff" process should produce a stream where each line is enriched with a status flag ("new", "changed", "same", "deleted") indicating the result of the comparison.

Background: I have successfully implemented this functionality within the "Pentaho DI" platform for years. However, I am struggling to replicate it in Apache NIFI. While I assume a processor for this must exist, I haven't been able to find or configure it yet.

I would greatly appreciate any guidance or advice from the community. If anyone could point me in the right direction or suggest the appropriate NIFI components, that would be very helpful.

Thanks in advance!

Cheers,
Tomas.

3 REPLIES 3

avatar
Community Manager

@moonspa Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @SAMSAL  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Super Guru

Hi @moonspa ,

You might want to look into ForkEnrichment/JoinEnrichment processors. Lets assume that CSV has the latest information that needs to be merged into the Database, then using processor like GetFile\FetchFile you get the CSV file, then you use ForkEnrichment processor to fork the flow into two: originl (csv) and enrichment using enrichment relationship for the DB, then you connect the result from the DB query (using ExecuteSQLRecord for example) and the original to the JoinEnrichment processor where you decide what is the merging strategy. For example you can use SQL merge strategy so that you can link records using the ID with Full Join. This will give you the full list from both csv and the dB, however since this not real sql you cant use sql function like isNull or conditional statement to set the status but from there you can use QueryRecord processor where you can utilize SQL Calcite syntax to generate the status (new, same , deleted, changed) column.

Keep in mind as the documentation on the Fork\JoinEnrichment processor indicate that using SQL strategy with large amount of data might consume a lot of jvm heap which can lead to out of memory errors. To avoid this look into the section "More Complex Joining Strategies" toward the end to use a different approach with the Wrapper strategy alongside ScriptedTransformRecord processor.

Hope that helps. let me know if you have more questions.

If you found this is helpful please accept the solution.

Thanks

avatar
Community Manager

@moonspa Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.  Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: