Member since
07-30-2019
3472
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 204 | 06-03-2026 06:06 PM | |
| 494 | 05-06-2026 09:16 AM | |
| 947 | 05-04-2026 05:20 AM | |
| 552 | 05-01-2026 10:15 AM | |
| 671 | 03-23-2026 05:44 AM |
01-21-2021
07:06 AM
@Lallagreta Make sure you do not have any line returns in the values for your dynamic properties added in the UpdateAttribute processor. When you click on the value field for each property you should not see a line "2". For example: Above would result in the value assigned to the FlowFile Attribute having a line return. If this is the case, edit the properties value(s) to remove the line returns so you only see one line (1). Hope this helps, Matt
... View more
01-21-2021
05:40 AM
2 Kudos
@Siddo The current strategy you are using is the best option with a use a case where the client is sending/pushing data to listeners across your NiFi cluster nodes. Whenever you have a client that is pushing data to NiFi, this setup avoids as your mentioned having a single point of failure. If a load balancer can't be used, It becomes the responsibility of the client to detect delivery problems and switch to delivering to a different node. Load balancing within NiFi's dataflows is the best option when the dataflow is consuming from a source system. Some data consumption methods are not cluster friendly (for example FTP). This is because every node in a NiFi cluster executes the same flow.xml.gz. If you had for example the listSFTP/GetSFTP processors running on every node, you would have data duplication and potentially issues as every node tried to consume the same data. So in this scenario you would configure the processor to execute on the primary node only and then use LB connections to immediately redistribute those FlowFiles across your cluster before doing further processing. This is why we created the List and Fetch processor pairs. These are typically non cluster friendly type processors. So a ListSFTP produces FlowFiles with zero content and only attributes with details on where to fetch a specific FlowFiles content. Those 0 bytes FlowFiles quickly Load balance across the cluster where the FetchSFTP processor would fetch the actual content for the FlowFile specific data file and insert it into the FlowFile. This type of setup also avoid single point of failure since loss of the currently elected primary node (where the data lister/consumer is running) would result in a new node being elected as the new Primary node. That new primary node reads state from the cluster state provider and begins listing where the previous elected node's list processor stopped. So you can see that each use case has very specific benefits/use cases. Another scenario may be that even with an external F5 LB, you may find one node in your cluster ends up with a larger burden of work load (maybe one node ends up with bulk of larger data files. That data can be redistribute on connections were such single node bottle necks occur to re-balance the load at that point in a dataflow. So at times a combination may make sense as well, but I would not just apply this strategy unless needed since it adds to network usage. NiFi's internal LB connections can also be used to move all data to a single node for some use case. Let's say there is a batch of data spread out across multiple NiFi nodes that you want to merge in to a single FlowFile. NiFi nodes each work on only the FlowFiles on their own node. But using LB connection in specific spots on your flow would allow you to move all like data to the sam node before a merge type processor. Hope this helps, Matt
... View more
01-15-2021
06:06 AM
1 Kudo
@Lyoung The NiFi client (NiFi or MiNiFi instance running the Remote Process Group (RPG)) has not control over the connection with the server (NiFi configured with Remote input or Output ports). The RPG is provided with a http or https address of one or more target NiFi nodes in a NiFi cluster). A background thread connects to that target NiFi to fetch Site-To-Site (S2S) details. If the target is https enabled, a mutual TLS handshake will happen. This means the client must have a keystore and truststore configured in the nifi.properties (NiFi) or config.yaml (MiNiFi) that can successfully be used to mutually authenticate with the target NiFi server. The server side NiFi must have the properties you listed configured: nifi.remote.input.host=<must be set to hostname of NiFi on which you are configuring this property. This is the hostname returned to client in the S2S details. Be careful that what ever you set heer does not resolve to localhost.> nifi.remote.input.secure=false (this tells client if connection is secure or unsecure. If false, the "nifi.web.http.port" property must be set and the URL used in the RPG must be "http://<target nifi>:<http port>/nifii". If set to true, the "nifi.web.https.port" property must be set and the URL used in the RPG must be "https://<target nifi>:<https port>/nifii" nifi.remote.input.socket.port=<This is the RAW port that will be used to actually send or receive the FlowFiles from remote Input or Output ports on target NiFi node(s). If this property is not set on the target NiFi node(s), RAW transport protocol will not be supported. (S2S details are always fetched over HTTP)> nifi.remote.input.http.enabled=true. <This properties states whether the "http" transport protocol can be used for sending the FlowFiles.> nifi.remote.input.http.transaction.ttl=30 sec nifi.remote.contents.cache.expiration=30 secs Based on the log output shared it sounds like above properties were not set on the Target NiFi node(s). Did you set them on client NiFi (NiFi actually running the RPG)? In addition to the Target NiFi S2S details above for each target NiFi node being returned to client, the details will also include the FlowFile load on each node, Remote input ports that client has been authorized to use, and Remote Output ports that the client has been authorized to use. If the target server side NiFi node(s) are unsecured then there will be no authorization set for ports, all clients would have access to all remote input/output ports. Also keep in mind that any changes to NiFi's/MiNiFi's configuration files would require a restart of the service before they would be applied. Aside from above, I would need to see screenshots and nifi.properties/config.yaml configs of both your client and server side of this S2S connection to help further. Hope this helps, Matt
... View more
01-05-2021
11:19 AM
2 Kudos
@garoosy You should look in to using the "ExecuteSQLRecord" instead of "ExecuteSQL" for large volume data. To be efficient here you would have many records in a single FlowFile. Right now you have a single record per each FlowFile which is not going to be very efficient. The only way for "ExecuteSQL" to handle multiple FlowFile executions in a single connection is if the SQL statement used in every FlowFile is identical. In order to do that the unique values would need to come from FlowFile attributes. You may find these post helpful: https://community.cloudera.com/t5/Support-Questions/Nifi-ExectueSQL-how-to-force-a-parameter-to-be-a-string/td-p/240117 https://stackoverflow.com/questions/63330790/using-nifi-executesqlrecord-with-parameterized-sql-statements If you have threads that never seem to complete (will see small number in upper right corner of processor (2)), it is best to get a series of thread dumps (4 - 6) to verify thread is not progressing. Then you have to determine if what the thread is waiting on. Did you try setting a "Max Wait Time" on the processor? It defaults to 0 which means it would wait forever. Hope this helps, Matt
... View more
01-05-2021
10:49 AM
@kiranps11 Did you add and start a "DistributedMapCacheServer" controller service running on port 4557? The "DistributedMapCacheClientService" controller service only creates a client that is used to connect to a server you must also create. Keep in mind that the DistributedMapCacheServer does not offer High Availability (HA). Enabling this controller services will start a DistributedMapCacheServer on each node in your NiFi cluster, but each of those servers do not talk to each other. This is important to understand since you have configured your DMC Client to use localhost. This means that each node in your cluster would be using its own DMC server rather than a single DMC server. For a HA solution you should be using an external map cache via one of the other client offerings like "HBase_2_ClientMapCacheService " or "RedisDistributedMapCacheClientService", but this would require you to setup that external HBAs or Redis server with HA yourself. Hope this helps, Matt
... View more
01-04-2021
08:23 AM
1 Kudo
@adhishankarit As I mentioned you need an additional unique attribute that you only add on the failure path (ConstructHDFSError UpdateAttribute) to MergeContent. overall-status = ERROR Since this attribute (overall-status) is not being set on the success path, the mergeContent "Attribute strategy" set to "Keep All Unique Attributes" will then set this overall-status attribute on the merged FlowFile produced. Keep All Unique Attribute --> any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. Since you are not setting this attribute on your success path FlowFiles, it would only be set on mergeFlowFiles where one of more FlowFiles traversed the failure flow path. This allows you to capture overall-status of the zip bundle. Then in your ReplaceText processor you would use a more complex NiFi Expression Language (EL) in your replacement value. Something like: ${uniquefile}:${overall-status:isNull():ifElse('success','${overall-status}')}:${message} This will set "success" if the "overall-status" attribute does not exist on any FlowFiles that were part of the merged FlowFiles; otherwise it will set the it to the value set in the "overall-status" attribute. If you found this help, please take a moment to click "accept solution" on all responses that helped. Matt
... View more
12-28-2020
10:33 AM
@adhishankarit You dataflow screenshot does not reflect the entire dataflow you are then trying to describe making this use case hard to follow. 1. Your flow starts with a single zip file? 2. You unzip that file to produce numerous output FlowFiles? 3. You use load balanced connections to distribute all the produced FlowFiles across all nodes in your cluster? 4. Then you modify content of FlowFile using convertAttribitesToJson processor (Destination=flowfile-content)? Looks like you route the "success" relationship twice from this processor which means you have cloned your FlowFiles. Why? 5. One of these connection looks like it uses a Load Balance connection (how is it configured?) to feed a MergeContent. MergeContent can not merge across multiple nodes (can only merge FlowFiles on same node. How is MergeContent configured? Your desire output does not look like Json, but you are using AttributesToJson processor? 6. Where do the "failure" FlowFiles get introduced in to this dataflow? When you unpack your original FlowFile each produced FlowFile will have new attributes set on it to include segment.original.filename, fragment.identifier, fragment.count, and fragment.index. These attributes can be used with the "defragment" merge strategy in MergeContent. So I would avoid cloning FlowFiles post unpack. Process each FlowFile in-line. When you encounter a "failure", set an attribute on these FlowFiles only that states a failure occurred (successfully processed FlowFiles should not have this unique attribute). Then use MergeContent and set keep all Unique Attributes. This will allow the unique attribute if exists on any one FlowFile to show up on the output merged FlowFile will not work if same attribute exists on multiple FlowFiles with different values). Now after merge you can modify the content again using ReplaceText processor configured with Append to add the first line with overall status of this file from that unique attribute you preserved through the merge. Also not following statement: "also noticed that if there is a delay in processing " Hope this helps, Matt
... View more
12-24-2020
07:35 AM
@adhishankarit There is nothing you can pull form the NiFi Rest-API that is going to tell you about successful outcomes form processor execution on a FlowFile. Depending on data volumes, this also sounds like a resource expensive endeavor. That being said, NiFi does have a Site-To-SIte (S2S) Bulletin reporting task. When a processor throws an error it will produce a bulletin and this reporting task can capture those bulletins and send them via NiFi's S2S protocol to another NiFi instance directly into a dataflow were you can handle via dataflow design however you like. Only way you can get INFO level logs in to bulletins is by setting the bulletin level to INFO on all you processors. This only works if you have also configured your NiFI logback.xml so that all NiFi components log at the INFO level as well. Downsides to this: 1. Every processor would display the red bulletin square in upper right corner of processor, This makes using this to find components that are having issues difficult 2. This results in a lot of INFO level logging to the nifi-app.log. You mention edge nodes. You could setup a tailFile processor that tails the nifi-app.log and then send via a dataflow that log data via FlowFiles to some monitor NiFi cluster where another dataflows parses those records via a partitionRecord processor by log_level and then routes based on that log_level for additional handling/notification processing. Downside here: 1. Since you what to track success, you still need INFO level logging enabled for all components. This means even this log collection flow is producing log output. So large logs and logs being written to even when actual data being processed in other flows is not happening. NiFi does have a master bulletin board which you could hit via the rest-api, but this does not get you past the massive logging you may be producing to monitor success. https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Hope this gives you some ideas, Matt
... View more
12-23-2020
07:46 AM
@Nyk That is correct, dynamically added properties are all of type non-sensitive. You would need to build a custom processor with static configurable properties that have a PropertyDescriptor with ".sensitive(true)". I am not a developer myself, but you may find this resource useful: https://itnext.io/deep-dive-into-a-custom-apache-nifi-processor-c2191e4f89a0 If you found my answer addresses your question, please click on "Accept as Solution" below the answer. Hope this helps you, Matt
... View more
12-23-2020
07:27 AM
1 Kudo
@murali2425 I was not able to reproduce the missing attributes in the content of a produced FlowFile from the AttributesToJson processor. What version of NiFi are you using? Did you inspect the attributes on the FlowFile in the immediate connection feeding the AttributesToJson before starting that processor? Your desired output from the AttributesToJson seems to be very specific and not include all attributes including the core attributes anyway. My suggestion would be to use an UpdateAttribute processor just before your AttributesToJson processor to build the specific attributes you want to have in your produced Json output content.
You would then add two custom dynamic properties where you would use NiFi Expression Language to populate the values from other attributes/metadata on the source FlowFile:
You could then configure your AttributesToJson processor to build the JSON content using only those two new attributes you just constructed:
Keep in mind that the AttributesToJson processor will add attributes to the Json in Lexicographical order. So if you want the uuid before the filepath, you will need to adjust the property names used in the UpdateAttribute processor. For example "auuid" instead of "myuuid" so that it comes before "filepath" in order. Hope this helps,
Matt
... View more