Member since
07-30-2019
3472
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 251 | 06-03-2026 06:06 PM | |
| 529 | 05-06-2026 09:16 AM | |
| 1032 | 05-04-2026 05:20 AM | |
| 585 | 05-01-2026 10:15 AM | |
| 701 | 03-23-2026 05:44 AM |
06-14-2023
01:11 PM
1 Kudo
@noncitizen MergeContent processor. A "bin" is a virtual container in which FlowFiles are assigned during execution of the mergeContent processor. FlowFiles that are allocated to "bin(s)" will remain in NiFi heap memory and can not be swapped out to disk. How FlowFiles are allocated to bins from inbound connections during execution depends on the configured "merge strategy". Bin-Packing Algorithm - Will allocated FlowFiles to one bin until that bin has reached the configured mins (min num entries and min group size). If a FlowFile cannot be allocated to a bin (for example doing so would mean exceeding the configured max group size), then the FlowFile will be allocated to a second bin. Defragment - use case specific that is dependent on source FlowFiles having specific attributes about each fragment (fragment.identifier, fragment.index, fragment.count, and segment.original.filename). A new bin is used for each unique fragment.identifier FlowFile attribute value. For your use case description, you would be using "bin-packing algorithm" merge strategy. When MergeContent executes (0 secs means execute as often as possible), it would look at the unallocated FlowFiles in one of the inbound connections at the exact moment in time and allocate those to an existing bin or bins depending as described previously. At the end of binning the FlowFiles, it looks to see if any bins are eligible to be merged. MergeContent will merge a bin when any one of the following is true: Both mins have been met for the bin (min num entries AND min group size). Min group size is ignored if blank. Bin contains all fragments of a fragmented FlowFile (merge strategy = defragment only) Bin has reached configured max bin age (max bin age forces the merge of a bin after configured amount of time, in age starts upon first allocated FlowFile. This prevents a bin that never reached the configured mins from sitting un-merged indefinitely. If all bins have FlowFiles allocated to them and next unallocated FlowFile can not be allocated to one of these existing bins (oldest bin is forced to merge to free a bin in which that new FlowFile will get allocated). When merge strategy = defragment, oldest bin of FlowFiles is routed to "failure" relationship instead of forced merge to free a bin. I suspect that by having only 1 bin, a forced merge is happening in some of you tests. In others the min(s) are set too low and bin becomes eligible for merge before all FlowFiles have been allocated to the bin. (You reported this worked once and probably because you had all 63 CSVs queued in the inbound connection before you started the mergeContent and other times when it failed all components were running as data streamed through your dataflow). The mergeContent processor has no idea how many FlowFiles should go into a bin (unless merge strategy = defragment). Also keep in mind that multiple nodes in a NiFi cluster execute dataflows independently of other nodes in the cluster. Each node has its own copy of the flow.json.gz loaded in memory, each node has its own content and FlowFile repositories, and each node executes only on the FlowFile present on that node. So if you have multiple nodes ingesting data that you want to merge in to a single FlowFile (zip), then the use of "single node" load balanced connection prior to mergeContent processor is correct approach. So now lets look at what configuration would mostly likely work for you: Merge Strategy = Bin-Packing Algorithm Merge Format = zip Correlation Attribute = <blank> since you are not trying to divide incoming FlowFiles into different bins. min number of entries = 100 (since you are trying to make sure all 63 FlowFiles make it in to the bin regardless of how many processor executions it takes to accomplish that) max number of entries = 1000 (default) max bin age = 2 mins (set this high enough that you feel confident all FlowFiles will reach inbound connection prior to bin being forced to merge. default is blank and depending on server resources could mean this processor executes many times per second) max number of bins = 5 (default) I never recommend having only 1 bin. All other properties are defaults. What this does is allows 2 mins for all 63 of your FlowFiles to get placed in one bin before the max bin age kicks in and forces that bin to merge. OF course you can adjust this after testing (You have source FlowFiles that are already CSV but you have others that need to be unpacked which may delay them reaching mergeContent even if it is milliseconds. Even that short delay could mean different executions of the mergeContent try to bin and merge). Also single node is important if yoru FlowFiles are spread across all yoru cluster nodes since MergeContent can only merge those on same node. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-09-2023
10:24 AM
@naveenb Your query will get better visibility by starting a new question in the community rather then asking on an already solved question. NiFi's ListSFTP and GetSFTP (deprecated in favor of listSFTP and FetchSFTP) processor only lists/gets files. When it generates a NiFi FlowFile from a file it finds recursively within the source SFTP server configured base directory, it adds a "path" attribute to that FlowFile. That "path" attribute has the absolute path to the file. So based on your configuration, the results you are seeing are expected since you configured your putSFTP with "/home/ubuntu/samplenifi/${path}" Were "path" attribute on your FlowFiles resolves to "/home/nifiuser/nifitest/sample" for files found in that source subdirectory. You can use NiFi expression language (NEL) to modify that "path" attribute string to get rid of the "/home/nifiuser" portion /home/ubuntu/samplenifi/${path:substringAfter('/home/nifiuser')} If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-07-2023
12:49 PM
@EddyChan The exception: Caused by: org.apache.nifi.authorization.exception.AuthorizerCreationException: Unable to locate initial admin joseAce@azure.com to seed policies means that on startup the file-access-policy-provider attempted to generate the authorizations.xml file to seed it with initial admin policies for user identity string "joseAce@azure.com"; however, neither of the file-user-group-provider or the aad-user-group-provider returned that user identity string. So you'll need to take a look at the generated authorizers.xml files when you create your two replicas. Something is getting messed up in the configuration file. The order in which the providers are added to the authorizers.xml is very important (User-group-providers must come first, then composite-providers, then file-access-policy-provider, and finally the authorizer). Perhaps in your file-access-policy-provider, you also have the following property being added which is blank: <property name="Node Group">nifi</property If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-07-2023
12:32 PM
@SandyClouds @steven-matison response if perfect. Just to add to your second question "2. NiFi jobs also start automatically without me running from UI ?" NiFi is really designed as an always on data in motion service. NiFi preserves the state (stopped, started, enabled, disabled) of all its components in the flow.json.gz file. On startup of the service, NiFi will load the flow.json.gz, load FlowFiles form the NiFi Repository back in to connection within your dataflow, and return all components to last known state. There is a property in the nifi.properties file "nifi.flowcontroller.autoResumeState=true" that controls this behavior. If you change it from default "true" to "false", NiFi will reset all components to stopped (processors) or disabled (controller services and reporting tasks) on startup. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-07-2023
12:24 PM
2 Kudos
@SandyClouds Some clarity and additions to @cotopaul Pros and Cons: Single Node: PROs: - easy to manage. <-- Setup and managing configuration is easier since you only need to do that on one node. But in a cluster, all nodes configuration files will be almost the same (some variations in hostname properties and certificates if you secure your cluster). - easy to configure. <-- There are more configurations needed in a cluster setup, but once setup, nothing changes from the user experience when it comes to interacting with the UI. - no https required. <-- Not sure how this is a PRO. I would not recommend using an un-secure NiFi as doing so allow anyone access to your dataflows and the data being processed. You can also have an un-secure NiFi cluster while i do not recommend that either. CONs: - in case of issues with the node, you NiFi instance is down. <-- Very true, single point of failure. - it uses plenty of resources, when it needs to process data, as everything is done on a single node. Cluster: PROs: - redundancy and failover --> when a node goes down, the others will take over and process everything, meaning that you will not get affected. <-- Not complete accurate. Each node in a NiFi cluster is only aware of the data (FlowFiles) queued on that specific node. So each node works on the FlowFile present on that one node, so it is the responsibility of the dataflow designer/builder to make sure they built their dataflows in such away to ensure distribution of FlowFiles across all nodes. When a node goes down, any data FlowFiles currently queued on that down node are not going to be processed by the other nodes. However, other nodes will continue processing their data and all new data coming in to your dataflow cluster - the used resources will be split among all the nodes, meaning that you can cover more use cases as on a single node. <-- Different nodes do not share or pool resources from all nodes in the cluster. If your dataflow(s) are built correctly the volume of data (FlowFiles) being processed will be distributed across all your nodes along each node to process a smaller subset of the overall FlowFile volume. This means more resources available across yoru cluster to handle more volume. NEW -- A NiFi cluster can be accessed via any one of the member nodes. No matter which node's UI you access, you will be presented with stats for all nodes. There is a cluster UI accessible from the global menu that allows you to see a breakdown of each node. Any changes you make from the UI of any one of the member nodes will be replicated to all nodes. NEW -- Since all nodes run their own copy of the flow, a catastrophic node failure does not mean loss of all your work since the same flow.json.gz (contains everything related to your dataflows) can be retrieved from any of the other nodes in your cluster. CONs: - complex setup as it requires a Zookeeper + plenty of other config files. <-- NiFi cluster requires a multi node zookeeper setup. Zookeeper quorum is required for cluster stability and also stores cluster wide state needed for your dataflow. Zookeeper is responsible for electing a node in your cluster with the Cluster Coordinator role and Primary node role. IF a node goes down that has been assigned one of these roles, Zookeeper will elected one of the still up nodes to the role - complex to manage --> analysis will be done on X nodes instead of a single node. <-- not clear. Yes you have multiple nodes and all those nodes are producing their own set of NiFi-logs. However, if a component within your dataflow is producing bulletins (exceptions) it will report all nodes or the specific node(s) on which bulletin was produced. Cloudera offers centralized management of your NiFi cluster deployment via Cloudera Manager software. Makes deploying and managing NiFi cluster to multiple nodes easy, sets up and configures Zookeeper for you, and makes securing your NiFi easy as well by generating the needed certificates/keystores for you. Hope this helps, Matt
... View more
06-01-2023
07:42 AM
@ChuckE Thank you for the details as they are very helpful. The ValidateJson processor was a new community contribution to Apache NiFi in version 1.19. https://issues.apache.org/jira/browse/NIFI-7392 It does not appear the processor supports dynamic Json Schema values a runtime. It requires exactly one resource. I don't know if this was because of the "Schema Version" property aspect, where supporting dynamic Json could cause issues since each Json Schema may use different schema versions. I'd encourage you to create an Apache NiFi jira enhancement request for this component. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-01-2023
07:22 AM
@EddyChan In your authorizers.xml it appears you have configured your "managed-authorizer" --> "Access Policy Provider" property to reference a User Group Provider (aad-user-group-provider" instead of an Access Policy Provider. It should be configured to use the "file-access-policy-provider". The file-access-policy-provider then gets a list of users and groups from the then referenced user-group-providers in order to generate initial admin access policies. I see your file-access-policy-provider "initial admin identity" property is configured with: {{.Values.auth.oidc.admin}} I don't know what user identity string this resolves to. Keep in mind that NiFi users and group idnetity strings are case sensitive. So if your admin user identity returned by the "aad-user-group-provider" is "joseace@azure.com" for example and "{{.Values.auth.oidc.admin}}" resolves to "joseAce@azure.com", NiFi recognizes these as two different users. You may also want to try putting the add-user-group-provider class in DEBUG in the NiFi logback.xml to see if it provides a listing of all the users and groups returned by that provider configuration. org.apache.nifi.authorization.azure.AzureGraphUserGroupProvider If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
05-31-2023
09:47 AM
@ChuckE Understood. What i am curious about is where in your dataflow do you implement the logic to determine which schema needs to be used? So you have some FlowFile with json content in your dataflow. - You then need to determine which schema needs to go with this specific FlowFile's content. - Then you want to return that schema text from a parameter context. So how do you make that determination of what schema goes with which FlowFile? Simply by where data was consumed from? Have you considered using the "Advanced" UI of the updateAttribute to create rules based on how you make your determinations to add a new FlowFIle Attribute with the extracted schema from the parameter context(s)? Thanks, Matt
... View more
05-30-2023
02:22 PM
@VLban MergeContent and MergeRecords handling merging of FlowFiles's content differently. Since your FlowFiles already contain Json formatted record(s), using MergeContent is not going to be the correct processor to use. MergeContent does not care about the data/content format (except for Avro) of the inbound FlowFiles. With Binary Concatenation, On flowFile's content bytes are simply write starting at the end of the last FlowFile's content. So in the case of JSON, the resulting merged FlowFile's content is not going to be valid json anymore. Both processors will bin FlowFiles each time the Processor executes based on its run schedule. At the end of each bin cycle the bins are evaluated to see if both configured mins are satisfied. If so, the bin will be merged. Setting a max does not mean that the bin will wait to get merged until the max has been met. So you would be better to set your min to 500 MB if you always want files of at least 500 MB and set you max to a value a bit larger then that. Doing so may result in bins that say have 480 MB binned and next FlowFile can't be added because it would then exceed configured max (FlowFile placed in new bin). So the Max Bin Age property when set will force a bin to merge once the bin has existed for the configured max bin age (this avoid FlowFile getting stuck in these merge based processors). If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
05-30-2023
02:03 PM
@ChuckE I would not expect this to work. The "evaluateELString()" NiFi Expression Language (NEL) function triggers the evaluation of any NEL statements found in the subject passed to that function. An NEL statement always starts with "${"; however, what you have is "#{" since you have a parameter context reference in your subject. So the expected output would be the literal subject in your case. Somewhere in your dataflow you are assigning the literal parameter name to a FlowFile Attribute. Why not evaluate the parameter rather then assign it as a literal string in an FlowFile attribute? Perhaps some more context/details around your use case would help here? Thanks, Matt
... View more