Created on 12-26-2020 09:40 PM - edited 12-26-2020 10:08 PM
Hi All,
I am building Nifi flow which fetch the filename.zip from shared server and unpack the .zip file then distributes the files across nifi cluster and finally put the files in HDFS. By doing this process , i am collecting success or failure status of each processors using custom message in
Updateattribute processor . Refer the sample attached screen-print for updateattribute processor .
Next i am planning to update overall status(success or error) of .zip file along with each file processed status ,like combining all information into one file and push the file to shared server to know about the status. I tried to use Mergecontent processor but i am unable to update the .zip file status and also noticed that if there is a delay in processing
How to build the flowfile like below
Filename.zip:staus:error_message
Filename1.status:sucess_message
Filename2:status:success_message
Filename3:status:error_message
Going forward i may also update the status message in JSON
Created 12-28-2020 10:33 AM
You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow.
1. Your flow starts with a single zip file?
2. You unzip that file to produce numerous output FlowFiles?
3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster?
4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)? Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles. Why?
5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent. MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured? Your desire output does not look like Json, but you are using AttributesToJson processor?
6. Where do the "failure" FlowFiles get introduced in to this dataflow?
When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index. These attributes can be used with the "defragment" merge strategy in MergeContent. So I would avoid cloning FlowFiles post unpack. Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes. This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values). Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge.
Also not following statement:
"also noticed that if there is a delay in processing "
Hope this helps,
Matt
Created 01-04-2021 08:23 AM
As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent.
overall-status = ERROR
Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced.
Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile.
Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path. This allows you to capture overall-status of the zip bundle.
Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value.
Something like:
${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message}
This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status" attribute.
If you found this help, please take a moment to click "accept solution" on all responses that helped.
Matt
Created 12-28-2020 10:33 AM
You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow.
1. Your flow starts with a single zip file?
2. You unzip that file to produce numerous output FlowFiles?
3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster?
4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)? Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles. Why?
5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent. MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured? Your desire output does not look like Json, but you are using AttributesToJson processor?
6. Where do the "failure" FlowFiles get introduced in to this dataflow?
When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index. These attributes can be used with the "defragment" merge strategy in MergeContent. So I would avoid cloning FlowFiles post unpack. Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes. This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values). Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge.
Also not following statement:
"also noticed that if there is a delay in processing "
Hope this helps,
Matt
Created on 01-04-2021 03:29 AM - last edited on 01-07-2021 07:22 AM by VidyaSargur
Created 01-04-2021 08:23 AM
As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent.
overall-status = ERROR
Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced.
Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile.
Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path. This allows you to capture overall-status of the zip bundle.
Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value.
Something like:
${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message}
This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status" attribute.
If you found this help, please take a moment to click "accept solution" on all responses that helped.
Matt
Created on 01-06-2021 08:18 AM - edited 01-06-2021 08:19 AM
Created 02-11-2021 03:29 AM
In continuation of above logic and i implemented and it is working fine . I have three nodes nifi cluster where a,b,c .Here a node is primary node always . If a goes out of space , we will unzip the file from another node and distributes to b,c nodes and we push the file to HDFS folder and we capture success and failure message( in Updateattribute processor) as mentioned in the posts above . After that in mergerecord process , the queue is waiting for sometime and merging happens after 10mins around . I have a dependency of the merging processor as i have to share the status of flowfile to another API which is requesting for it. How we can speed up the MergeRecord processor here . If all three nodes have good amount of memory and the flowfiles pushed to HDFS and status updates (through UpdateProcessorattribute) and merging happens very quickly as expected in single node. But we have nodes(b,c) and 'a' primary node is out due to space issue in device .The processor unpack the .zip file then distribute to nodes tp put into HDFS .As we capture each success/failure status of PutHDFS (using updateattribute processor) and mergerecord is taking sometime to merge the status . Is this because of other two nodes processing it slowly or some other reason for that.
I have attached screenshot of the partial flow .
Created 02-11-2021 07:57 AM
@adhishankarit
When moving on to a new issue, I recommend always starting a new query for better visibility. (for example, someone else in the community may have more experience with new issue then me).
As far as your new query, your screenshots do not show any stats on the processor to get an idea of what we are talking about here in terms of performance. How many fragments are getting merged? How large are each of these fragments?
NiFi nodes are only aware and have access to the FlowFiles on the individual node. So if node a is "out" (not sure what that means), any FlowFiles still on node "a" that are part of the same fragment will not yet be transferred to node b or c to get binned for merge. The Bin can not be merged until all fragments are present on the same node. Since you mention that bin eventually happens after 10 minutes, tells me that eventually all fragments eventually make it on to the same node.
I suggest the first thing to address here is your space issue on your nodes. Also keep in mind that while you have noticed that node "a" has always been your elected primary node, there is no guarantee that will always be the case. A new Cluster Coordinator and Primary node can be elected by Zookeeper at anytime. If you shutdown or disconnect currently elected primary node "a" you should see another node get elected as primary node. Adding node "a" back in will not force ZK to elect it back as primary node. So don't build your flow around a dependency on any specific node being primary node all the time.
Matt