Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 196 | 12-17-2025 05:55 AM | |
| 257 | 12-15-2025 01:29 PM | |
| 191 | 12-15-2025 06:50 AM | |
| 285 | 12-05-2025 08:25 AM | |
| 476 | 12-03-2025 10:21 AM |
06-04-2021
01:33 PM
1 Kudo
@khaldoune For a standalone NiFi (meaning that "nifi.cluster.is.node" is set to false in nifi.properties file), components (processors, controller services, and reporting tasks) that write state will use local state directory to record state. Problem here is that if you switch to being clustered later, there is no way to move the components state from local to zookeeper. NOTE: It is possible to have a 1 node NiFi cluster (offers no HA control plane that way), but it will still require that you have a zookeeper quorum. Hope this helps, Matt
... View more
06-04-2021
01:26 PM
@Griggsy Here is an example of how you could use ReplaceText: Search value: [^\w\d\r\n! @^$*#()_=<>~`|{}:;,.\-\\\?\/]+ The below site is a great way to test above regex against some sample data you have to make sure you are not missing any exceptions you want to keep https://regex101.com/ It also explains this above regex formatting If you found these responses helpful, please take a moment to login and click "Accept" on all solutions that help you. Thanks, Matt
... View more
06-03-2021
09:45 AM
1 Kudo
@khaldoune NiFi state is used by only NiFi components/frameworks bits that are built to use it. Some select components can be configured to use local state even if you are setup with a NiFi cluster. Others select components will use cluster state if NiFi is clustered, but in a standalone NiFi use local state. You can refer to the embedded documentation for each component processor, controller service, or reporting task to see if it uses the state provider. For example, look at the embedded docs for ListFile and you will see a "state management:" section with: Above you see that this processor can use local or cluster state provider and a description of how state us utilized by this component. Configuration of the "Input Directory Location" in the case of this specific processor controls which provider is used. For components that do not use state (bulk of components don't use state), the same section in their embedded docs will reflect: Load balanced connections to do not record state. Load balanced connections copy FlowFiles from one node to another and on confirmation of success, the local copies are removed. So if NiFi is shutdown or dies while data is being copied by a load balanced connection the source NiFi will simply start over distributing the FlowFiles again when it is back online in the cluster. If you found this addressed your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
06-02-2021
09:30 AM
@_mark_ As NiFi is an open source product, I recommend joining the community (if you have not already) and opening an Apache NiFi Jira [1] with you proposed enhancements/new features for Apache NiFi to get feedback from the community at large. If you feel you are not there yet in proposing a new feature/enhancement, try engaging via the users mailing list [2] [1] https://issues.apache.org/jira/browse/NIFI [2] https://nifi.apache.org/mailing_lists.html Thanks, Matt
... View more
06-02-2021
08:05 AM
@MrWilkinson You may have a bad download? Check the consistency of the contents of following folder: <path to NiFi>/lib/bootstrap
... View more
06-02-2021
07:52 AM
@midee I am not clearly following your use case. FlowFiles consist of two parts, FlowFile attributes/metadata and FlowFile content. You give example with "customefields_12345" and "customefields_12346". Does this mean one FlowFile may have multiple "customefields_<some string>" attributes assigned to it? How do you want to route FlowFiles where only some of those customfield attributes are null while others are not? There are multiple ways to handle this using NiFi Expression Language (NEL) [1] and the routeOnAttribute [2] processor. ${anyMatchingAttribute("customfield.*"):isEmpty()} Above would return "true" if ANY of the NiFi FlowFile attributes starting with "customefield" is empty. note: The isEmpty function returns true if the Subject is null, does not contain any characters or contains only white-space (new line, carriage return, space, tab), false otherwise. There is another NEL subjectless function that would return "true" only if ALL FlowFileAttributes matching the Java regular expression were empty: ${allMatchingAttributes("customfield.*"):isEmpty()} With the RouteOnAttribute processor you create/add dynamic properties and each of those becomes a new routable relationship on the processor. if the NEL statement configured for that dynamic property returns true that FlowFile routes to that relationship. Any FlowFile that does not return true for dynamic properties will get routed to the pre-existing relationship named "unmatched". [1] https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html [2] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.13.2/org.apache.nifi.processors.standard.RouteOnAttribute/index.html If you found this addressed your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
06-02-2021
06:22 AM
1 Kudo
@hegdemahendra The NiFi CLI toolkit [1] can help here to an extent. This toolkit provides the following NiFi-Registry capabilities: registry current-user
registry list-buckets
registry create-bucket
registry delete-bucket
registry list-flows
registry create-flow
registry delete-flow
registry list-flow-versions
registry export-flow-version
registry import-flow-version
registry sync-flow-versions
registry transfer-flow-version
registry diff-flow-versions
registry upload-bundle
registry upload-bundles
registry list-bundle-groups
registry list-bundle-artifacts
registry list-bundle-versions
registry download-bundle
registry get-bundle-checksum
registry list-extension-tags
registry list-extensions
registry list-users
registry create-user
registry update-user
registry list-user-groups
registry create-user-group
registry update-user-group
registry get-policy
registry update-policy
registry update-bucket-policy You can get a description of each by executing: <path to>/cli.sh registry sync-flow-versions -h Since you are changing FlowPersistence providers and not trying to sync flows to a new NiFi-Registry, You really can't use the above "sync-flow-versions" function. Plus, I really don't see it even in that scenario being able to accomplish your goal because you would end up with new flow ids. When you create a bucket in NiFi-Registry it is assigned a bucket if (random uuid). When you version control a Process Group (PG) in NiFi, you choose an existing bucket and it first creates a new flow id (Random UUID assigned to the flow). Then the initial version 1 of that PG flow is created and assigned to that flow id in the NiFi-Registry. Since you cannot force the flow id assigned UUID, syncing flows from registry 1 to registry 2, would not track to your version controlled PGs in your NiFI because of change in flow id. In your scenario, you would need to export all your flows (version by version and it is important you keep rack of the version fo the flow you extract). So for a flow with ID XYZ you may have 6 versions. This means you would use: registry export-flow-version I'd suggest naming the produced json file using source flow id and flow version like XYZ_v1.json, XYZ_v2.json, etc... Example: ./cli.sh registry export-flow-version -ot json -u http://<nifi-registry hostname>:<port>/ -f c97fd570-e2ef-4001-98c9-8810244b6015 -fv 1 -o /tmp/c97fd570-e2ef-4001-98c9-8810244b6015_ver1.json You should then save off your original DB. Delete all existing flows so all you have are your original buckets Then you would need to take all these exported flows and import them back in to registry after switching to your new persistence provider. Now keep in mind before importing each flow version you must first create a new flow within the correct still existing buckets. Keep track of these newly assigned flow ids and which original flow id you are importing in to them (very important) Then you MUST import each new flow in exact version 1 to version x order. If you import version 5 of flow XYZ first it will become version 1 within that new flow Id. The version persisted in the output json is not used when importing, it is assigned the next incremental version in the new flow id. Once you are done here you have a bunch of new flow ids with all your versions imported. Now you need to go edit your flow.xml.gz in NiFi. For every version controlled PG in that flow.xml.gz you will find a section that looks like this: <versionControlInformation>
<registryId>912e8161-0176-1000-ffff-ffff98135aca</registryId>
<bucketId>0cab84ff-399b-4113-9767-687e8e33e48a</bucketId>
<bucketName>bucket-name</bucketName>
<flowId>136b3ba8-bc6f-46dd-afe5-235a80ef8cfe</flowId>
<flowName>flow-name</flowName>
<flowDescription/>
<version>5</version>
</versionControlInformation> Everything here should remain the same except fro the change in "flowId" This would allow you to do a global search and replace on "<flowId>original id</flowId>" to "<flowId>new id</flowId>". Make sure you stop all NiFi nodes, put same modified flow.xml.gz on all nodes (backup original), and start NiFi nodes again. Your PGs should now be tracking to your new flows imported in your registry now backed by the gitFlowPersistenceProvider. [1] https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI Sorry there is no automated path for this. If you found this addressed your query, please take a moment to login and click "Accept" on those solutions which assisted you. Thanks, Matt
... View more
06-02-2021
06:17 AM
@Maqbool Can you provide some more detail on what you are trying to accomplish here? If we look at the NiFi Expression Language (NEL) example that does work you shared: ${category_mappings:jsonPath("$.catCode")} NEL evaluates this as follows: Finds NiFi variable with name "category_mappings" and return the value assigned to that variable (Depending on level of NEL support in the component property were this is used, it may search for that variable name in multiple places (FlowFile attributes, variable registry, java properties, environment variables). The returned value is then passed to the NEL function :jsonPath(). Now if we look at your parameter context example: #{category_mappings:jsonPath("$.catCode")} It is looking for the parameter context with string "category_mappings:jsonPath("$.catCode")" which it did not find in the "Parameters" assigned to the Process Group in which this processor resides. I suspect this is not what you really are trying to accomplish here. I suspect that the parameter context you have in your parameters on this process group is really only "category_mappings" and you still need to use NEL in order to execute a NEL function against that parameter's value. ${#{category_mappings}:jsonPath("$.catCode")} You'll notice the slight difference here. We now have a NEL statement that gets its subject from a parameter "category_mappings" instead of variable and passes that value to the NEL ":jsonPath()" function. If you found this addresses your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
12:50 PM
1 Kudo
Hello @aie Awesome to hear you're getting involved with NiFi. NiFi has both ConsumeKafka and ConsumeKafkaRecord based processors. These Kafka processor components in NiFi utilize Kafka client code and with Kafka the client version matters. So make sure you are using the component version that matches the version of yoru Kafka server. NiFi produces FlowFiles that move from component to component in your dataflow(s). A FlowFile consists of two parts, FlowFile Attributes/metatdata and FlowFile Content (actual physical content of ingested/created data). The FlowFile Attributes/Metadata lives with NiFi's JVM heap memory space and is persisted to the FlowFile repository while content resides in the content repository and only exists in heap memory space if a specific processor should need for that. I bring this up because more FlowFiles = higher heap usage and processing resources related to the creating and management of those FlowFiles. The many NIFi "record" based processors (like ConsumeKafkaRecord) are much more efficient to use. The consumeKafkaRecord processor while ingest many Kafka messages to a single FlowFile. Down side here is this makes calculating things like messages per minute much harder to do. Now if you are just looking to count the total number of consumed messages, you can do this. I'll use the ConsumeKafkaRecord_2_0 processor as an example here. Most processor will create or modify FlowFile attributes and you can look at the embedded usage documentation for each component to see what those attributes are. All created attributes on a FlowFile can be read by other processor using NiFi Expression Language. So If we look at the Attributes created on the FlowFiles created by the ConsumeKafkaRecord processor we will find a couple named "record.count" and "kafka.topic" (make note that all attribute names are case sensitive). The "record.count" attribute will tell you the number of messages in this one FlowFile. We can now use this "record.count" attribute can now be used to increment a counter using the UpdateCounter processor component. We use the "kafka.topic" attribute as the counter name so that one counter ic created for each unique topic you may consume from. We use the "record.count" attribute so we can increment that counter based on number of messages in each FlowFile. Configuration would look like this: You can observe the counters and reset them via the counters UI found under the NiFi global menu in upper right corner of the UI. Also keep the following in mind for good performance. NiFi's when it comes to the consumeKafka processors is a Kafka Consumer Group. The processor itself lets you set a unique "Group ID" for this processors Kafka consumer group. When you add any processor component to the NiFi UI it starts with the default "1" concurrent task. For Optimal Kafka performance you want your Kafka Consumer Group to have as many consumers as there are partitions in the target topic. Also keep in mind that if you are running a NiFi cluster (recommended), each processor executes on each node as part of same consumer group. So 3 node cluster, will mean your ConsumeKafkaRecord with 1 concurrent task has three consumers in that group. So you would ideally for best performance want to have a multiple of 3 as the number of partitions on the topic (3, 6, 9, 12, 15, etc...). Let say you have a 3 node cluster and your topic has 15 partitions, then your consumeKafkaRecord should be set to 5 concurrent tasks (5 x 3 nodes = 15 consumers). Avoid having more consumers than partitions or you will constantly have rebalance going on. So even if you had 16 partitions on your topic you would still set only 5 concurrent tasks. One of your 15 available consumers would simply be assigned to receive data from two topics instead of just one. If you found this information was helpful with yoru query, please take a moment to login and click "Accept" on this solution. https://nifi.apache.org/docs.html Thank you and happy NiFi'ing.... Matt
... View more
05-28-2021
10:49 AM
1 Kudo
@hegdemahendra The ./cli.sh registry sync-flow-versions toolkit description: Syncs the versions of a flow to another flow, which could be in a different bucket or registry. This command assumes the intent is to maintain the exact version history across the two flows. The list of versions from the source flow will be compared to the destination flow, and any versions not present will be added. The timestamp reflects the time which that particular flow version was added to the target flow. In this case you are adding additional flow versions to a flow in yoru target NiFi-Registry. All those flow versions were added at about the same time. So the timestamp of when they were created is accurate in that target NiFi-Registry. What this toolkit is doing is very simplistic. You have a defined source flow (defined by a specific UUID). That flow has X number if versions. On your target NiFi-Registry you create a new flow or have an exiting flow with its own UUID. The flow UUIDs between source and destination will not be the same. So these really are not the "same" flows. Each is a unique flow, but may have the same flow versions. Let assume if have a source flow (UUID=A) which contains 3 versions. On my target NiFi-Registry i create a new flow (UUID=B) and I use above command to sync my 3 versions form FlowA to FlowB. Now on some NiFi using that target NiFi-Registry imports that FlowB on to the canvas. Now back at source, someone created versions 4, 5, and 6 for FlowA. On that target NiFi-Registry, someone commits version 4. Version 4 for FlowA is not the same edits as version 4 in FlowB. Now i use same toolkit command above to again sync the Flow versions from FlowA to FlowB. The command sees that FlowB contains versions 1-4, so flow versions 5 and 6 only are exported and imported to FlowB. FlowB version 4 will still not match FlowA version 4. The NiFi using FlowB will not reflect that newer versions exist and when they change versions, they will lose the changes made in FlowB version 4. The target NiFi-Registry is properly recording the new flow version number and timestamp as each new flow version is added to FlowB (As each new flow version is added the flow version is incremented and timestamp of that import is recorded). As another example, you can export FlowA version 4 and import that single flow version into FlowB and it will take the next incremental flow version number of FlowB which could be version 1 if you just created FlowB. So using the ./cli/sh registry must be done with great care. Moving over flows initially is great, but trying to use this sync to periodically re-sync flows is likely not going to do exactly what is expected except under very controlled environments. If your intent is to move all NiFi-Registry flows from one NiFi-Registry to another and want to preserve everything for this one time move, it is probably better to copy over the Metadata Database [1] and Flow Persistence Provider from the source NiFi-Registry to the target NiFi-Registry [1] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#metadata-database [2] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#persistence-providers If you found this assisted you with your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more