Support Questions

Find answers, ask questions, and share your expertise

How to use query result in another query Apache NiFi

avatar
Explorer

Hi, experts,

I need to insert data from CSV to DB. CSV file name is the same as DB table name, but table has additional primery key column, so I need to add such column in CSV. I can get max key value from DB table. Please, advise how to use this key value in flowfile. Here is my flow

Shalexey_1-1727359094322.png

Thanks in advance.

1 ACCEPTED SOLUTION

avatar
Master Mentor

@Shalexey 

Your query does not contain a lot of detail around your use case, but i'll try to provide some pointers here.

NiFi processor components have one or more defined relationships.  These relationships are where the NiFi FlowFile is routed when a processor completes its execution.  When you assign a processor relationship to more then 1 outbound connection, NiFi will clone the FlowFile how ever many times the relationship usage is duplicated.  So looking at dataflow design shared, I see you have what appears to be  "success" relationship route twice out of the UpdateAttribute processor (this means the original FlowFile is sent to one of these connections and a new cloned FlowFile is sent to the other connection).

So you can't simply route both these FlowFiles back to your QueryRecord processor as each would be executed against independently.

If I am understanding your use case correctly you ingest a csv file that needs to be updated with an additional new column (primary key).  The value that will go into that new column is fetched from another DB via the ExecuteSQLRecord processor.  Problem is that the ExecuteSQLRecord processor would overwrite your csv content. 

So what you need to build is a flow that get the enhancement data (primary key) and adds it to the original csv before the putDataBaseRecord processor.

Others might have different solution suggestions, but here is one option that comes to mind:

MattWho_0-1727456293238.png

GetFile --> Gets the original CSV file
UpdateAttribute --> sets a correlation ID (corrID = ${UUID()}) so that when FlowFile is cloned later both can be correlated to one another with this correlation ID that will be same on both. 
ExecuteSQL --> query max key DB
QueryRecord --> trim output to just needed max key 
ExtractText --> Extract the max key value from the content to a FlowFile Attribute (maxKey).
ModifyBytes --> set Remove all Content  to true to clear content from this FlowFile (does not affect FlowFile attributes.
MergeContent - min num entries = 2, Correlation Attribute name = corrID, Attribute strategy  = Keep All Unique Attributes.  (This will merge both FlowFiles original and clone with same value in FlowFile attribute "corrID" into one FlowFile containing only the csv content)
UpdateRecord --> Used to insert the max key value from the max key FlowFile attribute into the original CSV content. (Record reader can infer schema; however, record writer will need to have a defined schema that includes the new "primary key" column. Then you will be able to add a dynamic property to insert maxkey flowfile attribute into the "primaryKey" csv column.

MattWho_1-1727457156154.png

PutDatabaseRecord --> write modified csv to destination DB.


Even if this does not match up directly, maybe you will be able to apply the NiFi dataflow design concept above to solution your specific detailed use case.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

2 REPLIES 2

avatar
Community Manager

@Shalexey Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @SAMSAL  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@Shalexey 

Your query does not contain a lot of detail around your use case, but i'll try to provide some pointers here.

NiFi processor components have one or more defined relationships.  These relationships are where the NiFi FlowFile is routed when a processor completes its execution.  When you assign a processor relationship to more then 1 outbound connection, NiFi will clone the FlowFile how ever many times the relationship usage is duplicated.  So looking at dataflow design shared, I see you have what appears to be  "success" relationship route twice out of the UpdateAttribute processor (this means the original FlowFile is sent to one of these connections and a new cloned FlowFile is sent to the other connection).

So you can't simply route both these FlowFiles back to your QueryRecord processor as each would be executed against independently.

If I am understanding your use case correctly you ingest a csv file that needs to be updated with an additional new column (primary key).  The value that will go into that new column is fetched from another DB via the ExecuteSQLRecord processor.  Problem is that the ExecuteSQLRecord processor would overwrite your csv content. 

So what you need to build is a flow that get the enhancement data (primary key) and adds it to the original csv before the putDataBaseRecord processor.

Others might have different solution suggestions, but here is one option that comes to mind:

MattWho_0-1727456293238.png

GetFile --> Gets the original CSV file
UpdateAttribute --> sets a correlation ID (corrID = ${UUID()}) so that when FlowFile is cloned later both can be correlated to one another with this correlation ID that will be same on both. 
ExecuteSQL --> query max key DB
QueryRecord --> trim output to just needed max key 
ExtractText --> Extract the max key value from the content to a FlowFile Attribute (maxKey).
ModifyBytes --> set Remove all Content  to true to clear content from this FlowFile (does not affect FlowFile attributes.
MergeContent - min num entries = 2, Correlation Attribute name = corrID, Attribute strategy  = Keep All Unique Attributes.  (This will merge both FlowFiles original and clone with same value in FlowFile attribute "corrID" into one FlowFile containing only the csv content)
UpdateRecord --> Used to insert the max key value from the max key FlowFile attribute into the original CSV content. (Record reader can infer schema; however, record writer will need to have a defined schema that includes the new "primary key" column. Then you will be able to add a dynamic property to insert maxkey flowfile attribute into the "primaryKey" csv column.

MattWho_1-1727457156154.png

PutDatabaseRecord --> write modified csv to destination DB.


Even if this does not match up directly, maybe you will be able to apply the NiFi dataflow design concept above to solution your specific detailed use case.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt