Support Questions

Find answers, ask questions, and share your expertise

Nifi Processors status merge attributes into single flowfile content

avatar
Contributor

Hi All,

 

I am building Nifi flow which fetch the filename.zip from shared server and unpack the .zip file then distributes the files across nifi cluster and finally put the files in HDFS. By doing Nifi status.PNGupdateattribute_1.PNGupdateattribute_2.PNGthis process , i am collecting success or failure status of each processors using custom message in

Updateattribute processor . Refer the sample attached screen-print for updateattribute processor .

 

Next i am planning to update overall status(success or error) of .zip file along with each file processed status ,like combining all information into one file and push the file to shared server to know about the status. I tried to use Mergecontent processor but i am unable to update the .zip file status and also noticed that if there is a delay in processing  

How to build the flowfile like below 

 

Filename.zip:staus:error_message
Filename1.status:sucess_message

Filename2:status:success_message

Filename3:status:error_message

 

Going forward i may also update the status message in JSON 

@Nifi  @NifiDev 

2 ACCEPTED SOLUTIONS

avatar
Master Mentor

@adhishankarit 

 

You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow.

1. Your flow starts with a single zip file?
2. You unzip that file to produce numerous output FlowFiles?
3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster?
4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)?  Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles.  Why?
5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent.  MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured?  Your desire output does not look like Json, but you are using AttributesToJson processor?
6. Where do the "failure" FlowFiles get introduced in to this dataflow?



When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index.  These attributes can be used with the "defragment" merge strategy in MergeContent.  So I would avoid cloning FlowFiles post unpack.  Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes.  This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values).  Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge.

Also not following statement:
"also noticed that if there is a delay in processing "

Hope this helps,
Matt

View solution in original post

avatar
Master Mentor

@adhishankarit 

 

As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent.

overall-status = ERROR


Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced.

Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile.

Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path.  This allows you to capture overall-status of the zip bundle.

Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value.

Something like:

${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message}

 

This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status"  attribute.

If you found this help, please take a moment to click "accept solution" on all responses that helped.

Matt

 

View solution in original post

6 REPLIES 6

avatar
Master Mentor

@adhishankarit 

 

You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow.

1. Your flow starts with a single zip file?
2. You unzip that file to produce numerous output FlowFiles?
3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster?
4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)?  Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles.  Why?
5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent.  MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured?  Your desire output does not look like Json, but you are using AttributesToJson processor?
6. Where do the "failure" FlowFiles get introduced in to this dataflow?



When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index.  These attributes can be used with the "defragment" merge strategy in MergeContent.  So I would avoid cloning FlowFiles post unpack.  Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes.  This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values).  Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge.

Also not following statement:
"also noticed that if there is a delay in processing "

Hope this helps,
Matt

avatar
Contributor
Hi ,
Thanks a lot for clarifying and provide solution for Merging contents . I have implemented as per your guidelines and the merge content working fine But i am unable to update overall status of file if it is error and message as well. Please find my status flow in detail. 
 
1.Your flow starts with a single zip file? --- I get meta file which contains .zip file to be fetched . for exmaple whosale.meta file which contain .zip file having 3 or 4 csv files . 
2. You unzip that file to produce numerous output FlowFiles? --- Yes , I unzip the file .
3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster? --- Yes , i do enable round robin load balancing for the flow files which was unzip in step 2 and push the flowfiles into HDFS .
4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)?  Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles.  Why?  --- Initially i was trying convert attributes into JSON flowfile. I have to capature success and failure status of the processor .So i have avoided not clone the success relationship twice.
 
Ingestionflow.PNG
 
 
6. Where do the "failure" FlowFiles get introduced in to this dataflow? --- If i encounter "failure" in process of pushing files to HDFS like permission denied or folder not available . Then i will add attribute with some custom messages and send to merge content . And i have also dirceting success status with some custom message of the final process success status in attribute to replacetext followed with Mergecontent .
 
Status flow 
 
flowstatusconstruct.PNG
 
HDFSError attribute : 
updateattribute_1.PNG
 
 
HDFS Successattribute :
updateattribute_2.PNG
 
After merge content i do use Replacetext processor as suggested to add overall status of the file but somehow the status (error) and message is not captured .
 
ReplaceTextProc to construct over all status :
 
MergeandReplace.PNGreplacetextattributes.PNG
 
 
The final file will contain like below 
 
Ingest_flow.zip : Success : 
test1.csv : Success : File artikelgroep_info.csv pused to PreRAW Zone
test2.csv : Success : File demo.csv pused to PreRAW Zone
test3.csv : Success : File work11.csv pused to PreRAW Zone
 
i could not populate over all message AND if any file error then i could not capture status " Failed" and with error in overall status of the .zip file .Could you please help on this 
 
 
Thanks

avatar
Master Mentor

@adhishankarit 

 

As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent.

overall-status = ERROR


Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced.

Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile.

Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path.  This allows you to capture overall-status of the zip bundle.

Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value.

Something like:

${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message}

 

This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status"  attribute.

If you found this help, please take a moment to click "accept solution" on all responses that helped.

Matt

 

avatar
Contributor

@MattWho 

 

This is Amazing and solution working well.

avatar
Contributor

@MattWho 

 

In continuation of above logic and i implemented and it is working fine . I have three nodes nifi cluster where a,b,c .Here a node is primary node always . If a goes out of space , we will unzip the file from another node and distributes to b,c nodes and we push the file to HDFS folder and we capture success and failure message( in Updateattribute processor)  as mentioned in the posts above . After that in mergerecord process , the queue is waiting for sometime and merging happens after 10mins around . I have a dependency of the merging processor as i have to share the status of flowfile to another API which is requesting for it. How we can speed up the MergeRecord processor here . If all three nodes have good amount of memory and the flowfiles pushed to HDFS and status updates (through UpdateProcessorattribute) and merging happens very quickly as expected in single node. But we have nodes(b,c) and 'a' primary node is out due to space issue in device .The processor unpack the .zip file then distribute to nodes tp put into HDFS .As we capture each success/failure status of PutHDFS (using updateattribute processor)  and  mergerecord is taking sometime to merge the status . Is this because of other two nodes processing it slowly or some other reason for that.

 

I have attached screenshot of the partial flow .

MergeRecordProcessor.PNGAttributestoJSON.PNGMergeContent_Properties.PNGMergeContent_scheduling_section.PNG

avatar
Master Mentor

@adhishankarit 

When moving on to a new issue, I recommend always starting a new query for better visibility. (for example, someone else in the community may have more experience with new issue then me).

As far as your new query, your screenshots do not show any stats on the processor to get an idea of what we are talking about here in terms of performance.  How many fragments are getting merged?  How large are each of these fragments?

NiFi nodes are only aware and have access to the FlowFiles on the individual node.  So if node a is "out" (not sure what that means), any FlowFiles still on node "a" that are part of the same fragment will not yet be transferred to node b or c to get binned for merge.  The Bin can not be merged until all fragments are present on the same node.  Since you mention that bin eventually happens after 10 minutes, tells me that eventually all fragments eventually make it on to the same node.

I suggest the first thing to address here is your space issue on your nodes. Also keep in mind that while you have noticed that node "a" has always been your elected primary node, there is no guarantee that will always be the case.  A new Cluster Coordinator and Primary node can be elected by Zookeeper at anytime.  If you shutdown or disconnect currently elected primary node "a" you should see another node get elected as primary node.  Adding node "a" back in will not force ZK to elect it back as primary node.  So don't build your flow around a dependency on any specific node being primary node all the time.

Matt