Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 185 | 12-17-2025 05:55 AM | |
| 246 | 12-15-2025 01:29 PM | |
| 181 | 12-15-2025 06:50 AM | |
| 277 | 12-05-2025 08:25 AM | |
| 460 | 12-03-2025 10:21 AM |
01-05-2021
10:49 AM
@kiranps11 Did you add and start a "DistributedMapCacheServer" controller service running on port 4557? The "DistributedMapCacheClientService" controller service only creates a client that is used to connect to a server you must also create. Keep in mind that the DistributedMapCacheServer does not offer High Availability (HA). Enabling this controller services will start a DistributedMapCacheServer on each node in your NiFi cluster, but each of those servers do not talk to each other. This is important to understand since you have configured your DMC Client to use localhost. This means that each node in your cluster would be using its own DMC server rather than a single DMC server. For a HA solution you should be using an external map cache via one of the other client offerings like "HBase_2_ClientMapCacheService " or "RedisDistributedMapCacheClientService", but this would require you to setup that external HBAs or Redis server with HA yourself. Hope this helps, Matt
... View more
01-05-2021
10:29 AM
1 Kudo
@Boenu Beginning with Apache NiFi 1.12 a change was implemented that changed the default anonymous access to static resources to false. This was done as part of https://issues.apache.org/jira/browse/NIFI-7170. This change also added the ability to add an additional property to the nifi.properties file to restore the behavior to that of Apache NiFi 1.11 and older versions: nifi.security.allow.anonymous.authentication=true The above restores previous behavior while work is done to change the how NiFi handles access to these static endpoints. The following Jiras cover that work: https://issues.apache.org/jira/browse/NIFI-7849 https://issues.apache.org/jira/browse/NIFI-7870 Hope this helps, Matt
... View more
01-04-2021
08:30 AM
@kalhan While it is possible to have a single ZK cluster to support multiple services, It is the recommendation that NiFi have its own dedicated ZK cluster. NiFi cluster stability is dependent on ZK and many of the NiFi processors that can be used depend on on Cluster state which is also stored in ZK. IF ZK becomes overburdened it can affect overall stability and performance of NiFi. If you found any of the answers provided on this query helped you, please select "accept solution" on each of them. Thank you, Matt Hope this helps.
... View more
01-04-2021
08:23 AM
1 Kudo
@adhishankarit As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent. overall-status = ERROR Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced. Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path. This allows you to capture overall-status of the zip bundle. Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value. Something like: ${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message} This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status" attribute. If you found this help, please take a moment to click "accept solution" on all responses that helped. Matt
... View more
12-28-2020
10:33 AM
@adhishankarit You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow. 1. Your flow starts with a single zip file? 2. You unzip that file to produce numerous output FlowFiles? 3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster? 4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)? Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles. Why? 5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent. MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured? Your desire output does not look like Json, but you are using AttributesToJson processor? 6. Where do the "failure" FlowFiles get introduced in to this dataflow? When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index. These attributes can be used with the "defragment" merge strategy in MergeContent. So I would avoid cloning FlowFiles post unpack. Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes. This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values). Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge. Also not following statement: "also noticed that if there is a delay in processing " Hope this helps, Matt
... View more
12-28-2020
09:59 AM
@JelenaS You would need to share some screenshots of the policies/permissions you have set on the bucket(s) you have created in your NiFi-Registry. - Go to "Settings" (wrench icon upper right corner within NiFi-Registry - under "BUCKETS" click pencil icon for bucket you expect your user to see - Your NiFi user which is logged in to NiFi should have write,delete,read on the bucket. Would also be helpful to what "Special Privileges" you have set for each of your NiFi nodes inside NiFi-Registry as well. - Go to "Settings" (wrench icon upper right corner within NiFi-Registry - under "USERS" click pencil icon for each of your NiFi nodes - Each of your NiFi nodes (case sensitive) should have "Can proxy user requests" and read on "Can manage buckets" checked.
... View more
12-24-2020
07:35 AM
@adhishankarit There is nothing you can pull form the NiFi Rest-API that is going to tell you about successful outcomes form processor execution on a FlowFile. Depending on data volumes, this also sounds like a resource expensive endeavor. That being said, NiFi does have a Site-To-SIte (S2S) Bulletin reporting task. When a processor throws an error it will produce a bulletin and this reporting task can capture those bulletins and send them via NiFi's S2S protocol to another NiFi instance directly into a dataflow were you can handle via dataflow design however you like. Only way you can get INFO level logs in to bulletins is by setting the bulletin level to INFO on all you processors. This only works if you have also configured your NiFI logback.xml so that all NiFi components log at the INFO level as well. Downsides to this: 1. Every processor would display the red bulletin square in upper right corner of processor, This makes using this to find components that are having issues difficult 2. This results in a lot of INFO level logging to the nifi-app.log. You mention edge nodes. You could setup a tailFile processor that tails the nifi-app.log and then send via a dataflow that log data via FlowFiles to some monitor NiFi cluster where another dataflows parses those records via a partitionRecord processor by log_level and then routes based on that log_level for additional handling/notification processing. Downside here: 1. Since you what to track success, you still need INFO level logging enabled for all components. This means even this log collection flow is producing log output. So large logs and logs being written to even when actual data being processed in other flows is not happening. NiFi does have a master bulletin board which you could hit via the rest-api, but this does not get you past the massive logging you may be producing to monitor success. https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Hope this gives you some ideas, Matt
... View more
12-24-2020
07:15 AM
@adhishankarit This only works if intent is to replace or append the FlowFile Attributes to the content of the FlowFile. But you will still not result in a single FlowFiles will all 15 attributes. ReplaceText does not merge anything and only has access to FlowFile Attributes on the FlowFile that is being executed upon. You would still need to merge the content of those 2 FlowFiles to have a single FlowFiles with all 15 attributes which would exist in the content of the new FlowFile.
... View more
12-24-2020
07:10 AM
@wasimakram First we need to answer some questions here: 1.What about the content of these two FlowFiles? Do both have same content? How do you want to handle that when you merge these FlowFIles? 2. Are all 15 of these attributes unique? Meaning that the same attribute name does not exist on both FlowFiles. Scenario 1: Let's assume both FlowFiles have same content. 1. Then you could user modifyBytes processor to remove all content from only one of those FlowFIles. 2. Then use Merge Content processor to merge those two FlowFiles and set property Keep all unique attributes. This will result in one FlowFile with content from only FlowFile that had content and all unique attributes from both source FlowFiles set on output new FlowFile. Scenario 2: Each FlowFile has unique content. 1. You could still use MergeContent just as in Scenario one and the resulting FlowFile will have all teh unique attributes and the merged content from both source FlowFiles. Another option: You could use maybe the putDistributedMapCache processor to write the desired attributes to a cache server. Then use the FetchDistributedMapCache to consume the needed attributes and place them on the other FlowFile. This will not perform as well and you need to consider under volume that you only process one set of FlowFiles at a time. Can be done but not as performant and adds complexity to dataflow design. Hope this helps, Matt
... View more
12-23-2020
01:39 PM
@Anurag007 You did not share how your logs are getting in to your NiFi. But once ingested, you could use a PartitionRecord processor using one of the following readers to handle parsing your log files: - GrokReader - SyslogReader - Syslog5424Reader You can then use your choice of Record Writers to output your individual split log outputs. You would then add one custom property that is used to group like log entries by the log_level This custom property will become a new FlowFile attribute on the output FlowFiles. You can then use a RouteOnAttribute processor to filter out only FlowFiles where the log_level is set to ERROR. Here is a simple flow I created that tails NiFi's app log and partitions logs by log_level and and then routes log entries for WARN or ERROR. I use the GrokReader with the following GrokExpression %{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:class} %{GREEDYDATA:message} I then chose to use the JsonRecordSetWriter My dynamic i added property in the Property = log_level
Value = /level In my RouteOnAttribute processor, I can route based on that new "log_level" attribute that will exist on each partitioned FlowFile using two dynamic property which each become a new relationship: property = ERROR
value = ${log_level:equals('ERROR')}
property = WARN
value = ${log_level:equals('WARN')} Hope this helps, Matt
... View more