Member since
09-26-2024
4
Posts
3
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
161 | 10-25-2024 05:50 AM |
10-25-2024
05:50 AM
2 Kudos
Finally I replaced LookupRecord processor with group of Fork/JoinEnrichment.
... View more
09-27-2024
10:22 AM
1 Kudo
@Shalexey Your query does not contain a lot of detail around your use case, but i'll try to provide some pointers here. NiFi processor components have one or more defined relationships. These relationships are where the NiFi FlowFile is routed when a processor completes its execution. When you assign a processor relationship to more then 1 outbound connection, NiFi will clone the FlowFile how ever many times the relationship usage is duplicated. So looking at dataflow design shared, I see you have what appears to be "success" relationship route twice out of the UpdateAttribute processor (this means the original FlowFile is sent to one of these connections and a new cloned FlowFile is sent to the other connection). So you can't simply route both these FlowFiles back to your QueryRecord processor as each would be executed against independently. If I am understanding your use case correctly you ingest a csv file that needs to be updated with an additional new column (primary key). The value that will go into that new column is fetched from another DB via the ExecuteSQLRecord processor. Problem is that the ExecuteSQLRecord processor would overwrite your csv content. So what you need to build is a flow that get the enhancement data (primary key) and adds it to the original csv before the putDataBaseRecord processor. Others might have different solution suggestions, but here is one option that comes to mind: GetFile --> Gets the original CSV file UpdateAttribute --> sets a correlation ID (corrID = ${UUID()}) so that when FlowFile is cloned later both can be correlated to one another with this correlation ID that will be same on both. ExecuteSQL --> query max key DB QueryRecord --> trim output to just needed max key ExtractText --> Extract the max key value from the content to a FlowFile Attribute (maxKey). ModifyBytes --> set Remove all Content to true to clear content from this FlowFile (does not affect FlowFile attributes. MergeContent - min num entries = 2, Correlation Attribute name = corrID, Attribute strategy = Keep All Unique Attributes. (This will merge both FlowFiles original and clone with same value in FlowFile attribute "corrID" into one FlowFile containing only the csv content) UpdateRecord --> Used to insert the max key value from the max key FlowFile attribute into the original CSV content. (Record reader can infer schema; however, record writer will need to have a defined schema that includes the new "primary key" column. Then you will be able to add a dynamic property to insert maxkey flowfile attribute into the "primaryKey" csv column. PutDatabaseRecord --> write modified csv to destination DB. Even if this does not match up directly, maybe you will be able to apply the NiFi dataflow design concept above to solution your specific detailed use case. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more