Member since
07-30-2019
3471
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 135 | 06-03-2026 06:06 PM | |
| 451 | 05-06-2026 09:16 AM | |
| 810 | 05-04-2026 05:20 AM | |
| 485 | 05-01-2026 10:15 AM | |
| 616 | 03-23-2026 05:44 AM |
02-24-2017
12:44 PM
2 Kudos
@Pradhuman Gupta Backpressure has kicked in on your dataflow. Every new connection by default has a default backpressure object threshold of 10,000 FlowFiles. When Backpressure is reached on a connection, the connection is highlighted in red and the backpressure bar (left = object threshold and right = Size threshold) will show which threshold has reached 100%. Once backpressure is applied, the component (processor) directly upstream of that connection will no longer run. As you can see in your screenshot above the "success" from your PutSplunk processor is applying backpressure. As a result the PutSplunk processor is no longer getting scheduled to run by the NiFi controller. Since it is no longer executing, FlowFiles began to queue on the connection between your TailFile and PutSplunk processor. Once backpressure kicked in here as well, the TailFile processor was stopped as well. If you clear the backpressure on the "success" connection between your PutSplunk and PutEmail processor, your dataflow will start running again. You can adjust the backpressure threshold by right clicking on a connection and selecting "configure". (The configure option is on available if the processors on both sides of a connection are stopped) In addition to adjusting backpressure settings, you also have the option of setting "file expiration" on a connection. File expiration dictates how old a FlowFile in a given connection can be. If the FlowFile has existed in your NiFi (not how long it has been in that specific connection) for longer then the configured time, it is purged from your dataflow. This setting if set aggressive enough could help keep your "success" relationship clean enough to avoid back pressure. Thanks, Matt
... View more
02-23-2017
07:06 PM
5 Kudos
There is a two part process before any access to NiFi UI is possible: 1. Authentication: By default NiFi will use a user/server's SSL certificate when provided in the connection to authenticate. When NO user/server certificate is presented, NiFi will then look for a Kerberos TGT (If Spnego has been configured in NiFi). Finally, if neither of the above where present in the connection, NiFi will use the login identity provider (if configured). Login identity providers include either ldap or kerberos. With both of these options, NiFi will present users with a login screen. 2. Authorization: Authorization is the mechanism that controls what features and components authenticated users are granted access. The default authorizer NiFi will use is the internal file based authorizer. There is an option to configure NiFi to use Ranger as the authorizer instead. The intent of this article is not to discuss how to setup NiFi to use any of the Authentication or Authorizer options. This article covers how to modify what identity is passed two the Authorizer after any one of the authentication mechanism is successful. What is actually passed to the authorizer varies depending on which Authentication method is in use. SSL certificates: Default, always enabled, and always checked first NiFi uses the full DN from the certificate. Spnego (kerberos): Always on when enabled and only used if a SSL Certificate was not present in connection. NiFi uses the full user principal. ldap-provider (option in login-identity-providers): Always on once configured and only used if both SSL certificate and TGT (if Spnego was enabled) are not present in connection. Default configuration of ldap-provider will use the full DN returned by LDAP upon successful authentication. (USE_DN Identity Strategy) Can be configured to pass the username used to login instead. (USE_USERNAME Identity Strategy) Kerberos-provider (option in login-identity-providers): Always on once configured and only used if both SSL certificate and TGT (if Spnego was enabled) are not present in connection. The kerberos-provider will use the use the user full principal upon successful authentication. (USE_DN Identity Strategy) Whether you choose to use the built in file based authorizer or optional configure you NiFi to use Ranger instead, users must be added and granted various access policies. Adding users using either full a DN or users principal can be both annoying and prone to errors since the authorizer is case sensitive and white spaces are valid characters. This is where NiFi's identity mapping optional configurations come in to play. Identity mapping takes place after successful authentication and before authorization occurs. It gives you the ability to take the returned value from all four of the authentication methods and pass them through 1 or more mappings to produce a simple resulting value which is then passed to your authorizer. The identity mapping properties are configured in NiFi's nifi.properties file and consist of two parts to each mapping you define: nifi.security.identity.mapping.pattern.<user defined>=
nifi.security.identity.mapping.value.<user defined>= The mapping pattern takes a java regular expression as input with the expectation that one of more capture groups are defined in that expression. One or more of those capture groups are then used in the mapping value to create the desired final result that will be passed to your configured authorizer. **** Important note: If you are implementing pattern mapping on a existing NiFi cluster that is already running securely, the newly added mappings will be run against the DNs from the certificates created for your nodes and the Initial Admin Identity value you originally configured. If any of your mapping match, a new value is going to passed to your authorizer which means you may lose access to your UI. Before adding any mapping make sure you have added the new mapped value users to your NiFi and authorized them so you do not lose access. By default NiFi includes 2 example identity mappings commented out in the NiFi properties file: You can add as many Identity mapping pattern and value as you like to accommodate all your various user/server authentication types. Each must have a unique identifier. In the above examples the unique identifiers are "dn" and "kerb". You could add for example "nifi.security.identity.mapping.pattern.dn2=" and "nifi.security.identity.mapping.value.dn2=" If you are using Ambari to install and manage your NiFi cluster (HDF 2.x version), you can find the 2 sample identity mapping properties under "Advanced nifi-properties": If you want add additional mappings beyond the above 2 via ambari, these would be added via the "Custom nifi-properties" config section. Simply click the "Add Property..." link to add your new mappings. The result of any successful authentication is run through all configured identity mapping until a match is found. If no match is found the full DN or user principal is passed to the authorizer. Let's take a look at a few examples: User/server DN or Principal Identity Mapping Pattern Identity Mapping Value Result passed to authorizer CN=nifi-server-01.openstacklocal, OU=NIFI ^CN=(.*?), OU=(.*?)$ $1 nifi-server-01 CN=nifi-01, OU=SME, O=mycp, L=Fulton, ST=MD, C=US ^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$ $1@$2 nifi-01@SME nifi/instance@MY.COMPANY.COM ^(.*?)/instance@(.*?)$ $1@$2 nifi@MY.COMPANY.COM cn=nifi-user1,ou=SME,dc=mycp,dc=com ^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$ $1 nifi-user1 JohnDoe@MY.COMPANY.COM ^(.*?)@(.*?)$ $1 JohnDoe ^EMAILADDRESS=none@none.com, CN=nifi-user2, OU=SME, O=mycp, L=Fulton, ST=MD, C=US ^EMAILADDRESS=(.*?), CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$ $2 nifi-user2 As you can see from the above examples, using NiFi's pattern mapping ability with simplify authorizing new users via either NiFi's default file based authorizer or using Ranger.
... View more
Labels:
02-23-2017
02:25 PM
9 Kudos
NiFi works with FlowFiles. Every FlowFile that exists consists of two parts, FlowFile content and FlowFile Attributes. While the FlowFile's content lives on disk in the content repository, NiFi holds the "majority" of the FlowFile attribute data in the configured JVM heap memory space. I say "majority" because NiFi does swapping of Attributes to disk on any queue that contains over 20,000 FlowFiles (default, but can be changed in the nifi.properties). Once your NiFi is reporting OutOfMemory (OOM) Errors, there is no corrective action other then restarting NiFi. If changes are not made to your NiFi or dataflow, you are surely going to encounter this issue again and again. The default configuration for JVM heap in NiFi is only 512 MB. This value is set in the nifi-bootstrap.conf file. # JVM memory settings
java.arg.2=-Xms512m
java.arg.3=-Xmx512m While the default may work for some dataflow, they are going to be undersized for others.
Simply increasing these values till you stop seeing (OOM) error should not be your immediate go to solution. Very large heap sizes could also have adverse impacts on your dataflow as well. Garbage collection will take much longer to run with very large heap sizes. While garbage collections occurs, it is essentially a stop the world event. This amount to dataflow stoppage for the length time it takes for that to complete. I am not saying that you should never set large heap sizes because sometimes that is really necessary; however, you should evaluate all other options first.... NiFi and FlowFile attribute swapping: NiFi already has a built in mechanism to help reduce the overall heap footprint. The mechanism swaps FlowFiles attributes to disk when a given connection's queue exceeds the configured threshold. These setting are found in the nifi.properties file: nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
nifi.queue.swap.threshold=20000
nifi.swap.in.period=5 sec
nifi.swap.in.threads=1
nifi.swap.out.period=5 sec
nifi.swap.out.threads=4 Swapping however will not help if your dataflow is so large that queues are how everywhere, but still have not exceeded the threshold for swapping. Anytime you decrease the swap threshold, more swapping can occur which may result in some throughput performance. So here are some other things to check for... So some common reason for running out of heap memory include: 1. High volume dataflow with lots of FlowFiles active any any given time across your dataflow. (Increase configured nifi heap size in bootstrap.conf to resolve)
2. Creating a large number of Attributes on every FlowFile. More Attributes equals more heap usage per FlowFile. Avoid creating unused/unnecessary Attributes on FlowFiles. (Increase configured nifi heap size in bootstrap.conf to resolve and/or reduce the configured swap threshold) 3. Writing large values to FlowFile Attributes. Extracting large amounts of content and writing it to an attribute on a FlowFile will result in high heap usage. Try to avoid creating large attributes when possible. (Increase configured nifi heap size in bootstrap.conf to resolve and/or reduce the configured swap threshold) 4. Using the MergeContent processor to merge a very large number of FlowFiles. NiFi can not merge FlowFiles that are swapped, so all these FlowFile's attributes must be in heap when the merge occurs. If merging a very large number of FlowFiles is needed, try using two MergeContent processors in series with one another. Have first merge a max of 20,000 FlowFiles and the second then merge those 10,000 FlowFile files in to even larger bundles. (Increase configured nifi heap size in bootstrap.conf also help) 5. Using the SplitText processor to split one File in to a very large number of FlowFiles. Swapping of a large connection queue will not occur until after the queue has exceeded swapping threshold. The SplitTEXT processor will create all the split FiLowFiles before committing them to the success relationship. Most commonly seen when SpitText is used to split a large incoming FlowFile by every line. It is possible to run out of heap memory before all the splits can be created. Try using two SplitText processors in series. Have the first split the incoming FlowFiles in to large chunks and the second split them down even further. (Increase configured nifi heap size in bootstrap.conf also help) Note: There are additional processors that can be used for splitting and joining large numbers of FlowFiles, so the same approach as above should be followed for those as well. I only specifically commented on the above since they are more commonly seen being used to deal with very large numbers of FlowFiles.
... View more
Labels:
02-22-2017
01:54 PM
@mayki wogno No problem... If one of the answers help drive you to a solution to your question, please accept that answer to help drive this community forward.
... View more
02-21-2017
04:37 PM
@mayki wogno One thing you could do is set "FlowFile Expiration" on the connection containing the "merged" relationship. And set the "Available Prioritizers" to " Newest FlowFileFirstPrioritizer". FlowFile expiration is measured against the age of the FlowFile (from creation time to now) and not how long it has been in a particular connection. If the FlowFile age exceeds this configured value, it is purged from the queue.
... View more
02-21-2017
03:09 PM
1 Kudo
@mayki wogno You can reduce or even eliminate the WARN messages by placing a MergeContent processor between your first and second DeleteHDFS processors that merges using "path" as the value to the "Correlation Attribute Name" property. The resulting merged FlowFile(s) would still have the same "path" that would be used by the second DeleteHDFS to remove your directory. Matt
... View more
02-21-2017
02:10 PM
That was the intent... It would only be successful after all files where deleted first. So only after the last file was removed would the directory deletion be successful.
... View more
02-21-2017
01:48 PM
@mayki wogno FlowFiles generated by the listHDFS processor all have a "path" attribute created on them: That attribute could be used to trigger you directory deletion via the DeleteHDFS processor. What is difficult here is determining when all data has been successfully pulled from an HDFS directory before deleting the directory itself. You could try using two DeleteHDFS processors in series with one another. The first DeleteHDFS deletes the files from the target "path" of the incoming FlowFiles and the second deletes the directory (Recursive property set to false). Matt
... View more
02-21-2017
01:16 PM
@mayki wogno Make sure the user your NiFi is running as is authorized to delete files and directories in your target HDFS. The DeleteHDFS processor properties are as follows: Thanks, Matt
... View more
02-09-2017
02:07 PM
1 Kudo
@mliem
Would you mind sharing your MergeContent processor's configuration? How large is the volume of tar files coming in to you flow? How many concurrent task do you have on your unpackContent? The reason I ask these questions is because they may all play a factor in why you are seeing the behavior you reported. My first thought would be that you have too few bins configured in your MergeContent processor. The MergeContent processor will start placing FlowFiles from the incoming queues in to bins based on the "Correlation Attribute Name" configured (Should be in your case "fragment.identifier"). If the MergeContent processor runs out of available bins unique bins, the oldest bin is merged. In you case since that oldest bin is incomplete (does not contain all fragments), it is routed to failure. For example you have Maximum number of bins configured to 10 and your incoming queue contains FlowFiles that we produced from more then 10 original tar files. It is possible that the Merge Content processor may be looking to create that 11th bin before all the FlowFiles that correlate to any of the other bins are processed. There are a few things you could try here (1 being most recommended and then bottom of list being the last thing I would try.): 1. Increase "Maximum number of bins" property in MergeContent. 2. Add the "OldestFlowFileFirstPrioritizer" to "Selected Prioritizers" list in the queue feeding your MergeContent. This will have a small impact on throughput performance. When UnpackContent splits your tar files all split files will have similar FlowFile creation timestamps. By setting the above prioritizer, FlowFiles will be placed in bins in timestamp order. If using this strategy, you would still need to have the number of bins set to the number concurrent tasks assigned to your UnpackContent processor plus one. 3. Decrease the "BackPressure Object Threshold" configuration on the incoming queue to the MergeContent processor. This is a soft limit. So lets say you have it set to 1000 and your Unpack Content untar resulted in 2000 FlowFiles, the queue would jump to 2000. The UnpackContent processor would then stop until that threshold dropped back below 1000. This would set few FlowFiles for your MergeContent processor to bin (meaning fewer needed bins). Thanks,
Matt
... View more