Member since
07-30-2019
3472
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 199 | 06-03-2026 06:06 PM | |
| 487 | 05-06-2026 09:16 AM | |
| 945 | 05-04-2026 05:20 AM | |
| 541 | 05-01-2026 10:15 AM | |
| 659 | 03-23-2026 05:44 AM |
06-03-2021
09:45 AM
1 Kudo
@khaldoune NiFi state is used by only NiFi components/frameworks bits that are built to use it. Some select components can be configured to use local state even if you are setup with a NiFi cluster. Others select components will use cluster state if NiFi is clustered, but in a standalone NiFi use local state. You can refer to the embedded documentation for each component processor, controller service, or reporting task to see if it uses the state provider. For example, look at the embedded docs for ListFile and you will see a "state management:" section with: Above you see that this processor can use local or cluster state provider and a description of how state us utilized by this component. Configuration of the "Input Directory Location" in the case of this specific processor controls which provider is used. For components that do not use state (bulk of components don't use state), the same section in their embedded docs will reflect: Load balanced connections to do not record state. Load balanced connections copy FlowFiles from one node to another and on confirmation of success, the local copies are removed. So if NiFi is shutdown or dies while data is being copied by a load balanced connection the source NiFi will simply start over distributing the FlowFiles again when it is back online in the cluster. If you found this addressed your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
06-02-2021
09:30 AM
@_mark_ As NiFi is an open source product, I recommend joining the community (if you have not already) and opening an Apache NiFi Jira [1] with you proposed enhancements/new features for Apache NiFi to get feedback from the community at large. If you feel you are not there yet in proposing a new feature/enhancement, try engaging via the users mailing list [2] [1] https://issues.apache.org/jira/browse/NIFI [2] https://nifi.apache.org/mailing_lists.html Thanks, Matt
... View more
06-02-2021
07:52 AM
@midee I am not clearly following your use case. FlowFiles consist of two parts, FlowFile attributes/metadata and FlowFile content. You give example with "customefields_12345" and "customefields_12346". Does this mean one FlowFile may have multiple "customefields_<some string>" attributes assigned to it? How do you want to route FlowFiles where only some of those customfield attributes are null while others are not? There are multiple ways to handle this using NiFi Expression Language (NEL) [1] and the routeOnAttribute [2] processor. ${anyMatchingAttribute("customfield.*"):isEmpty()} Above would return "true" if ANY of the NiFi FlowFile attributes starting with "customefield" is empty. note: The isEmpty function returns true if the Subject is null, does not contain any characters or contains only white-space (new line, carriage return, space, tab), false otherwise. There is another NEL subjectless function that would return "true" only if ALL FlowFileAttributes matching the Java regular expression were empty: ${allMatchingAttributes("customfield.*"):isEmpty()} With the RouteOnAttribute processor you create/add dynamic properties and each of those becomes a new routable relationship on the processor. if the NEL statement configured for that dynamic property returns true that FlowFile routes to that relationship. Any FlowFile that does not return true for dynamic properties will get routed to the pre-existing relationship named "unmatched". [1] https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html [2] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.13.2/org.apache.nifi.processors.standard.RouteOnAttribute/index.html If you found this addressed your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
06-02-2021
06:22 AM
1 Kudo
@hegdemahendra The NiFi CLI toolkit [1] can help here to an extent. This toolkit provides the following NiFi-Registry capabilities: registry current-user
registry list-buckets
registry create-bucket
registry delete-bucket
registry list-flows
registry create-flow
registry delete-flow
registry list-flow-versions
registry export-flow-version
registry import-flow-version
registry sync-flow-versions
registry transfer-flow-version
registry diff-flow-versions
registry upload-bundle
registry upload-bundles
registry list-bundle-groups
registry list-bundle-artifacts
registry list-bundle-versions
registry download-bundle
registry get-bundle-checksum
registry list-extension-tags
registry list-extensions
registry list-users
registry create-user
registry update-user
registry list-user-groups
registry create-user-group
registry update-user-group
registry get-policy
registry update-policy
registry update-bucket-policy You can get a description of each by executing: <path to>/cli.sh registry sync-flow-versions -h Since you are changing FlowPersistence providers and not trying to sync flows to a new NiFi-Registry, You really can't use the above "sync-flow-versions" function. Plus, I really don't see it even in that scenario being able to accomplish your goal because you would end up with new flow ids. When you create a bucket in NiFi-Registry it is assigned a bucket if (random uuid). When you version control a Process Group (PG) in NiFi, you choose an existing bucket and it first creates a new flow id (Random UUID assigned to the flow). Then the initial version 1 of that PG flow is created and assigned to that flow id in the NiFi-Registry. Since you cannot force the flow id assigned UUID, syncing flows from registry 1 to registry 2, would not track to your version controlled PGs in your NiFI because of change in flow id. In your scenario, you would need to export all your flows (version by version and it is important you keep rack of the version fo the flow you extract). So for a flow with ID XYZ you may have 6 versions. This means you would use: registry export-flow-version I'd suggest naming the produced json file using source flow id and flow version like XYZ_v1.json, XYZ_v2.json, etc... Example: ./cli.sh registry export-flow-version -ot json -u http://<nifi-registry hostname>:<port>/ -f c97fd570-e2ef-4001-98c9-8810244b6015 -fv 1 -o /tmp/c97fd570-e2ef-4001-98c9-8810244b6015_ver1.json You should then save off your original DB. Delete all existing flows so all you have are your original buckets Then you would need to take all these exported flows and import them back in to registry after switching to your new persistence provider. Now keep in mind before importing each flow version you must first create a new flow within the correct still existing buckets. Keep track of these newly assigned flow ids and which original flow id you are importing in to them (very important) Then you MUST import each new flow in exact version 1 to version x order. If you import version 5 of flow XYZ first it will become version 1 within that new flow Id. The version persisted in the output json is not used when importing, it is assigned the next incremental version in the new flow id. Once you are done here you have a bunch of new flow ids with all your versions imported. Now you need to go edit your flow.xml.gz in NiFi. For every version controlled PG in that flow.xml.gz you will find a section that looks like this: <versionControlInformation>
<registryId>912e8161-0176-1000-ffff-ffff98135aca</registryId>
<bucketId>0cab84ff-399b-4113-9767-687e8e33e48a</bucketId>
<bucketName>bucket-name</bucketName>
<flowId>136b3ba8-bc6f-46dd-afe5-235a80ef8cfe</flowId>
<flowName>flow-name</flowName>
<flowDescription/>
<version>5</version>
</versionControlInformation> Everything here should remain the same except fro the change in "flowId" This would allow you to do a global search and replace on "<flowId>original id</flowId>" to "<flowId>new id</flowId>". Make sure you stop all NiFi nodes, put same modified flow.xml.gz on all nodes (backup original), and start NiFi nodes again. Your PGs should now be tracking to your new flows imported in your registry now backed by the gitFlowPersistenceProvider. [1] https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI Sorry there is no automated path for this. If you found this addressed your query, please take a moment to login and click "Accept" on those solutions which assisted you. Thanks, Matt
... View more
05-28-2021
12:50 PM
1 Kudo
Hello @aie Awesome to hear you're getting involved with NiFi. NiFi has both ConsumeKafka and ConsumeKafkaRecord based processors. These Kafka processor components in NiFi utilize Kafka client code and with Kafka the client version matters. So make sure you are using the component version that matches the version of yoru Kafka server. NiFi produces FlowFiles that move from component to component in your dataflow(s). A FlowFile consists of two parts, FlowFile Attributes/metatdata and FlowFile Content (actual physical content of ingested/created data). The FlowFile Attributes/Metadata lives with NiFi's JVM heap memory space and is persisted to the FlowFile repository while content resides in the content repository and only exists in heap memory space if a specific processor should need for that. I bring this up because more FlowFiles = higher heap usage and processing resources related to the creating and management of those FlowFiles. The many NIFi "record" based processors (like ConsumeKafkaRecord) are much more efficient to use. The consumeKafkaRecord processor while ingest many Kafka messages to a single FlowFile. Down side here is this makes calculating things like messages per minute much harder to do. Now if you are just looking to count the total number of consumed messages, you can do this. I'll use the ConsumeKafkaRecord_2_0 processor as an example here. Most processor will create or modify FlowFile attributes and you can look at the embedded usage documentation for each component to see what those attributes are. All created attributes on a FlowFile can be read by other processor using NiFi Expression Language. So If we look at the Attributes created on the FlowFiles created by the ConsumeKafkaRecord processor we will find a couple named "record.count" and "kafka.topic" (make note that all attribute names are case sensitive). The "record.count" attribute will tell you the number of messages in this one FlowFile. We can now use this "record.count" attribute can now be used to increment a counter using the UpdateCounter processor component. We use the "kafka.topic" attribute as the counter name so that one counter ic created for each unique topic you may consume from. We use the "record.count" attribute so we can increment that counter based on number of messages in each FlowFile. Configuration would look like this: You can observe the counters and reset them via the counters UI found under the NiFi global menu in upper right corner of the UI. Also keep the following in mind for good performance. NiFi's when it comes to the consumeKafka processors is a Kafka Consumer Group. The processor itself lets you set a unique "Group ID" for this processors Kafka consumer group. When you add any processor component to the NiFi UI it starts with the default "1" concurrent task. For Optimal Kafka performance you want your Kafka Consumer Group to have as many consumers as there are partitions in the target topic. Also keep in mind that if you are running a NiFi cluster (recommended), each processor executes on each node as part of same consumer group. So 3 node cluster, will mean your ConsumeKafkaRecord with 1 concurrent task has three consumers in that group. So you would ideally for best performance want to have a multiple of 3 as the number of partitions on the topic (3, 6, 9, 12, 15, etc...). Let say you have a 3 node cluster and your topic has 15 partitions, then your consumeKafkaRecord should be set to 5 concurrent tasks (5 x 3 nodes = 15 consumers). Avoid having more consumers than partitions or you will constantly have rebalance going on. So even if you had 16 partitions on your topic you would still set only 5 concurrent tasks. One of your 15 available consumers would simply be assigned to receive data from two topics instead of just one. If you found this information was helpful with yoru query, please take a moment to login and click "Accept" on this solution. https://nifi.apache.org/docs.html Thank you and happy NiFi'ing.... Matt
... View more
05-28-2021
10:49 AM
1 Kudo
@hegdemahendra The ./cli.sh registry sync-flow-versions toolkit description: Syncs the versions of a flow to another flow, which could be in a different bucket or registry. This command assumes the intent is to maintain the exact version history across the two flows. The list of versions from the source flow will be compared to the destination flow, and any versions not present will be added. The timestamp reflects the time which that particular flow version was added to the target flow. In this case you are adding additional flow versions to a flow in yoru target NiFi-Registry. All those flow versions were added at about the same time. So the timestamp of when they were created is accurate in that target NiFi-Registry. What this toolkit is doing is very simplistic. You have a defined source flow (defined by a specific UUID). That flow has X number if versions. On your target NiFi-Registry you create a new flow or have an exiting flow with its own UUID. The flow UUIDs between source and destination will not be the same. So these really are not the "same" flows. Each is a unique flow, but may have the same flow versions. Let assume if have a source flow (UUID=A) which contains 3 versions. On my target NiFi-Registry i create a new flow (UUID=B) and I use above command to sync my 3 versions form FlowA to FlowB. Now on some NiFi using that target NiFi-Registry imports that FlowB on to the canvas. Now back at source, someone created versions 4, 5, and 6 for FlowA. On that target NiFi-Registry, someone commits version 4. Version 4 for FlowA is not the same edits as version 4 in FlowB. Now i use same toolkit command above to again sync the Flow versions from FlowA to FlowB. The command sees that FlowB contains versions 1-4, so flow versions 5 and 6 only are exported and imported to FlowB. FlowB version 4 will still not match FlowA version 4. The NiFi using FlowB will not reflect that newer versions exist and when they change versions, they will lose the changes made in FlowB version 4. The target NiFi-Registry is properly recording the new flow version number and timestamp as each new flow version is added to FlowB (As each new flow version is added the flow version is incremented and timestamp of that import is recorded). As another example, you can export FlowA version 4 and import that single flow version into FlowB and it will take the next incremental flow version number of FlowB which could be version 1 if you just created FlowB. So using the ./cli/sh registry must be done with great care. Moving over flows initially is great, but trying to use this sync to periodically re-sync flows is likely not going to do exactly what is expected except under very controlled environments. If your intent is to move all NiFi-Registry flows from one NiFi-Registry to another and want to preserve everything for this one time move, it is probably better to copy over the Metadata Database [1] and Flow Persistence Provider from the source NiFi-Registry to the target NiFi-Registry [1] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#metadata-database [2] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#persistence-providers If you found this assisted you with your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
09:23 AM
@Amn Where did you define the NiFi Attribute "${itemType}" (FlowFile Attribute on a source FlowFile driving execution of GetMongo processor or NiFi variable registry)? NiFi is also case sensitive, so for example "ItemType" and "itemType" would be considered different variables. If you found this helpful to yoru query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
09:14 AM
@Prajeesh10 If you are trying to split your source CSV in to two different FlowFile before converting each to a JSON, you could use the "SplitContent" [1] processor. [1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.13.2/org.apache.nifi.processors.standard.SplitContent/index.html If you found this useful, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
08:23 AM
@Rupesh_Raghani I just want to make sure that when we are both talking about NiFi "Templates" we are talking about the same thing. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates When you upload a NiFi template (xml file) to NiFi via the UI (Does not matter which node in a NiFi cluster you are accessing), that template will get uploaded and replicated to all nodes in the cluster. So all nodes will have that template in each nodes JVM heap and written to the flow.xml.gz on disk. This statement is not clear to me: "So if I have templates on every node in cluster then will the nifi load templates on each node as per templates available on their node or a parent node will load all templates from other cluster". What do you mean by "if I have templates on every node"? In a NiFi cluster, every node must have the same flow.xml.gz. If the flow loaded in to heap memory does not match between nodes, the nodes no matching the elected cluster flow will be disconnected from the cluster. Each node, while it has its own local copy of the flow, runs the exact same flow. Hope this helps, Matt
... View more
05-27-2021
08:48 AM
@Rupesh_Raghani NiFI templates are uploaded to the process group selected. Only users who have been granted view and modify on that PG will be able to instantiate those templates on to the NiFi canvas. NiFi operates within a single JVM, so there are not separate per user canvases. User not authorized on a specific NiFi Process Group (PG) will only see a blank representation for that component on the canvas (non-authorized users can not see name, description, configuration details, etc...). Non-authorized users can still see metrics presented on components and queue counts on connections only. The reason NiFi still renders all components is because everything is executing within a single JVM. - One users flows can impact another team (Some flow with huge backlog of queued data, high volume, etc can impact resource availability to other teams/users. - Stats on information bar is representative of entire canvas (all dataflows), so it is still important for all teams/users to be able to identify areas of concern even if is not their dataflows and they have no authorizations allowing them to do anything about it. But they could alert other teams/users. - Typical use case is team/user 1 has a PG and then team/user 2 has a different authorized PG. If NiFi did not render both PG to both users, nothing prevents users from building flows on top of one another. Then assume we have team/user 3 that is authorized for both above team/users PGs. This becomes unreadable useable by that third user. If you found this helped with your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more