Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache nifi simple flow

avatar
New Contributor

Hi guys, I'm relatively new to Apache Nifi, and there's a subject that I don't understand the logic behind, how to handle flowfiles between processors.
Basically I need to extract the data from an ExecuteSQL processor, then execute another ExecuteSQL to delete the data from the destination and finally insert the data using PutDatabaseRecord, but the value of the first ExecuteSQL is overlapped by the second, and it needs to be done in that order to ensure that the data exists before deleting the old base, could anyone guide me?

4 REPLIES 4

avatar
Community Manager

@imvn Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @SAMSAL @MattWho  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@imvn 

A NiFi FlowFile consists of two parts:

  1. FlowFile content - The content of a FlowFile is written to a content claim within the NiFI content repository.  A content claim is immutable.
  2. FlowFile Attributes/metadata - Attributes/metadata are written to the flowfile repository and persists in NiFi heap memory unless forced to swap due connection thresholds.  This metadata includes information about where to find the content among other things.

How NiFi processor handle inbound and outbound FlowFiles is largely processor specific.  Fo processors that write output to the content of a FlowFiles, this may be handle two different ways depending on processor.  Some processor might have an "original" relationship where the original FlowFile referencing original inbound content claim gets routed while creating a new FlowFile with same attributes pointing to new content output in a different claim and routing to some other relationship like "success".   Other processor might not have an "original" relationship and instead decrement a claimant count on the original content claim and update the existing FlowFile metadata to point to the content created in the new content claim.

The ExecuteSQL processor follows the latter process.

 

So you have a dataflow built like this if i understand correctly:

ExecuteSQL (writes content to FlowFile) --> some processor/processors (extract bits from content to use for Delete) --> ExecuteSQL (performs delete but response is written as new content for the FlowFile) --> PutDatabaseRecord (has issues since original needed FlowFile content is no longer associated with the FlowFile).

Option 1:
Can you re-order yoru processors so you have ExecuteSQL. --> PutDatabaseRecord --> Extract content --> ExecuteSQL (Delete)
This makes sure orginal content is persisted long enough to compete write to target DB.

Option 2:
ExecuteSQL --> ExtractContent --> Route "success" relationship twice (once to ExecuteSQL to perform delete ad second to PutDatabaseRecord to write to DB).  Similar to below example:

MattWho_0-1727295834787.png

You'll notice that the "matched" relationship has been routed twice.  When the same relationship is routed twice, NiFi clones the FlowFile (original goes to one connection and clone goes to the other.  Both FlowFiles reference the same content claim (which remember is immutable). When ExecuteSQL (delete) the executes on one of them it does not impact the content in the other one that is going to PutDatabaseRecord.

If I am not clear on your use case, let me know. I was a bit confused on the "delete data from destination" part. Destination = the putDatabaseRecord configured DB destination?
not clear why you would be deleting something there that has not yet been written.

So if there is a dependency that the ExecuteSQL (Delete) completes before the PutDatabaseRecord executes, there is a third option that utilizes the "FlowFile concurrency" and "outbound policy" settings on a Process Group.

MattWho_1-1727296616448.png

The dataflow would look something like this:

MattWho_2-1727296693358.png

Inside the Process Group configured with "FlowFile concurrency = Single FlowFile Per Node" and "outbound policy = Batch Output" set, you would have this flow:

MattWho_4-1727296833048.png

So you Dataflow only allows 1 FlowFile to enter the PG at a time.  Within the PG, the FlowFile is cloned with one FlowFile routing to the output port and the other to the ExecuteSQL (delete).  the FlowFile queued to exit PG will not be allowed to exit PG until the FlowFile being processed by the ExecuteSQL (delete) is auto-terminated or routed to some other output port.   This makes sure that the PutDatabaseRecord processor does not process the FlowFile with the original content claim until your ExecuteSQL (delete) was executed.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

avatar
New Contributor

Good afternoon MattWho, so I'm going to attach an image, maybe it will be clearer, your answer was very enlightening, but if I clear up those doubts about my question, your answer will be even clearer.
Well, I'm doing the process of deleting data from a local bank, and inserting the entire database again, but why am I consulting the value before deleting? why do I need to check if the query returned values, if it didn't, I can't delete the database, that's why this order of processors that I explained, in short the process I would like to execute would be ExecuteSQL (query data in source database) -> RouteOnAttribute (if there are no lines, it will follow another path) -> ExecuteSQL (deletes the data from the local database, i.e., destination) -> PutDatabaseRecord (insert the base again into the destination database).

imvn_0-1727298561887.png

 

avatar
Master Mentor

@imvn 

I am finding it hard to follow your use case here.

Your shared dataflow image shows:
1. ExecuteSQL (query data in source database) -->(produces a FlowFile that contains 0 or more rows) -->
2. RouteOnAttribute (evaluates the "executesql.row.count"? to see if it is "0"?  If so, routes the "0" row flowfiles to the "lines" relationship?.  Or are you auto-terminating within RouteOnAtrtibute if row count is "0" and lines relationship is used only if "executesql.row.count" is not "0"?)
3. I see the "lines" relationship is routed twice.  Once to another ExecuteSQL (deletes the data from the local database, i.e., destination) and another directly to PutDatabaseRecord (since PutDatabaseRecord has to inbound connections that will have a FlowFile, it will execute from both which is not what i think you want to happen.)

Just considering the above, I think option 3 which utilizes the "FlowFile concurrency" and "outbound policy" settings on a process group would handle you timing needs.  Where your RouteOnAttribute goes in place of the ExtractText processor and you feed lines into the child Process Group.

The question is what is the overall goal of this use case?  Are you trying to maintain an up-to-date copy of the source database in the destination database?  
or are you trying to just copy rows added to source DB to the destination DB?  If so there are better dataflow designs for that.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt