Member since
07-30-2019
3470
Posts
1642
Kudos Received
1018
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 301 | 05-06-2026 09:16 AM | |
| 492 | 05-04-2026 05:20 AM | |
| 355 | 05-01-2026 10:15 AM | |
| 524 | 03-23-2026 05:44 AM | |
| 394 | 02-18-2026 09:59 AM |
06-02-2021
06:22 AM
1 Kudo
@hegdemahendra The NiFi CLI toolkit [1] can help here to an extent. This toolkit provides the following NiFi-Registry capabilities: registry current-user
registry list-buckets
registry create-bucket
registry delete-bucket
registry list-flows
registry create-flow
registry delete-flow
registry list-flow-versions
registry export-flow-version
registry import-flow-version
registry sync-flow-versions
registry transfer-flow-version
registry diff-flow-versions
registry upload-bundle
registry upload-bundles
registry list-bundle-groups
registry list-bundle-artifacts
registry list-bundle-versions
registry download-bundle
registry get-bundle-checksum
registry list-extension-tags
registry list-extensions
registry list-users
registry create-user
registry update-user
registry list-user-groups
registry create-user-group
registry update-user-group
registry get-policy
registry update-policy
registry update-bucket-policy You can get a description of each by executing: <path to>/cli.sh registry sync-flow-versions -h Since you are changing FlowPersistence providers and not trying to sync flows to a new NiFi-Registry, You really can't use the above "sync-flow-versions" function. Plus, I really don't see it even in that scenario being able to accomplish your goal because you would end up with new flow ids. When you create a bucket in NiFi-Registry it is assigned a bucket if (random uuid). When you version control a Process Group (PG) in NiFi, you choose an existing bucket and it first creates a new flow id (Random UUID assigned to the flow). Then the initial version 1 of that PG flow is created and assigned to that flow id in the NiFi-Registry. Since you cannot force the flow id assigned UUID, syncing flows from registry 1 to registry 2, would not track to your version controlled PGs in your NiFI because of change in flow id. In your scenario, you would need to export all your flows (version by version and it is important you keep rack of the version fo the flow you extract). So for a flow with ID XYZ you may have 6 versions. This means you would use: registry export-flow-version I'd suggest naming the produced json file using source flow id and flow version like XYZ_v1.json, XYZ_v2.json, etc... Example: ./cli.sh registry export-flow-version -ot json -u http://<nifi-registry hostname>:<port>/ -f c97fd570-e2ef-4001-98c9-8810244b6015 -fv 1 -o /tmp/c97fd570-e2ef-4001-98c9-8810244b6015_ver1.json You should then save off your original DB. Delete all existing flows so all you have are your original buckets Then you would need to take all these exported flows and import them back in to registry after switching to your new persistence provider. Now keep in mind before importing each flow version you must first create a new flow within the correct still existing buckets. Keep track of these newly assigned flow ids and which original flow id you are importing in to them (very important) Then you MUST import each new flow in exact version 1 to version x order. If you import version 5 of flow XYZ first it will become version 1 within that new flow Id. The version persisted in the output json is not used when importing, it is assigned the next incremental version in the new flow id. Once you are done here you have a bunch of new flow ids with all your versions imported. Now you need to go edit your flow.xml.gz in NiFi. For every version controlled PG in that flow.xml.gz you will find a section that looks like this: <versionControlInformation>
<registryId>912e8161-0176-1000-ffff-ffff98135aca</registryId>
<bucketId>0cab84ff-399b-4113-9767-687e8e33e48a</bucketId>
<bucketName>bucket-name</bucketName>
<flowId>136b3ba8-bc6f-46dd-afe5-235a80ef8cfe</flowId>
<flowName>flow-name</flowName>
<flowDescription/>
<version>5</version>
</versionControlInformation> Everything here should remain the same except fro the change in "flowId" This would allow you to do a global search and replace on "<flowId>original id</flowId>" to "<flowId>new id</flowId>". Make sure you stop all NiFi nodes, put same modified flow.xml.gz on all nodes (backup original), and start NiFi nodes again. Your PGs should now be tracking to your new flows imported in your registry now backed by the gitFlowPersistenceProvider. [1] https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI Sorry there is no automated path for this. If you found this addressed your query, please take a moment to login and click "Accept" on those solutions which assisted you. Thanks, Matt
... View more
05-28-2021
12:50 PM
1 Kudo
Hello @aie Awesome to hear you're getting involved with NiFi. NiFi has both ConsumeKafka and ConsumeKafkaRecord based processors. These Kafka processor components in NiFi utilize Kafka client code and with Kafka the client version matters. So make sure you are using the component version that matches the version of yoru Kafka server. NiFi produces FlowFiles that move from component to component in your dataflow(s). A FlowFile consists of two parts, FlowFile Attributes/metatdata and FlowFile Content (actual physical content of ingested/created data). The FlowFile Attributes/Metadata lives with NiFi's JVM heap memory space and is persisted to the FlowFile repository while content resides in the content repository and only exists in heap memory space if a specific processor should need for that. I bring this up because more FlowFiles = higher heap usage and processing resources related to the creating and management of those FlowFiles. The many NIFi "record" based processors (like ConsumeKafkaRecord) are much more efficient to use. The consumeKafkaRecord processor while ingest many Kafka messages to a single FlowFile. Down side here is this makes calculating things like messages per minute much harder to do. Now if you are just looking to count the total number of consumed messages, you can do this. I'll use the ConsumeKafkaRecord_2_0 processor as an example here. Most processor will create or modify FlowFile attributes and you can look at the embedded usage documentation for each component to see what those attributes are. All created attributes on a FlowFile can be read by other processor using NiFi Expression Language. So If we look at the Attributes created on the FlowFiles created by the ConsumeKafkaRecord processor we will find a couple named "record.count" and "kafka.topic" (make note that all attribute names are case sensitive). The "record.count" attribute will tell you the number of messages in this one FlowFile. We can now use this "record.count" attribute can now be used to increment a counter using the UpdateCounter processor component. We use the "kafka.topic" attribute as the counter name so that one counter ic created for each unique topic you may consume from. We use the "record.count" attribute so we can increment that counter based on number of messages in each FlowFile. Configuration would look like this: You can observe the counters and reset them via the counters UI found under the NiFi global menu in upper right corner of the UI. Also keep the following in mind for good performance. NiFi's when it comes to the consumeKafka processors is a Kafka Consumer Group. The processor itself lets you set a unique "Group ID" for this processors Kafka consumer group. When you add any processor component to the NiFi UI it starts with the default "1" concurrent task. For Optimal Kafka performance you want your Kafka Consumer Group to have as many consumers as there are partitions in the target topic. Also keep in mind that if you are running a NiFi cluster (recommended), each processor executes on each node as part of same consumer group. So 3 node cluster, will mean your ConsumeKafkaRecord with 1 concurrent task has three consumers in that group. So you would ideally for best performance want to have a multiple of 3 as the number of partitions on the topic (3, 6, 9, 12, 15, etc...). Let say you have a 3 node cluster and your topic has 15 partitions, then your consumeKafkaRecord should be set to 5 concurrent tasks (5 x 3 nodes = 15 consumers). Avoid having more consumers than partitions or you will constantly have rebalance going on. So even if you had 16 partitions on your topic you would still set only 5 concurrent tasks. One of your 15 available consumers would simply be assigned to receive data from two topics instead of just one. If you found this information was helpful with yoru query, please take a moment to login and click "Accept" on this solution. https://nifi.apache.org/docs.html Thank you and happy NiFi'ing.... Matt
... View more
05-28-2021
10:49 AM
1 Kudo
@hegdemahendra The ./cli.sh registry sync-flow-versions toolkit description: Syncs the versions of a flow to another flow, which could be in a different bucket or registry. This command assumes the intent is to maintain the exact version history across the two flows. The list of versions from the source flow will be compared to the destination flow, and any versions not present will be added. The timestamp reflects the time which that particular flow version was added to the target flow. In this case you are adding additional flow versions to a flow in yoru target NiFi-Registry. All those flow versions were added at about the same time. So the timestamp of when they were created is accurate in that target NiFi-Registry. What this toolkit is doing is very simplistic. You have a defined source flow (defined by a specific UUID). That flow has X number if versions. On your target NiFi-Registry you create a new flow or have an exiting flow with its own UUID. The flow UUIDs between source and destination will not be the same. So these really are not the "same" flows. Each is a unique flow, but may have the same flow versions. Let assume if have a source flow (UUID=A) which contains 3 versions. On my target NiFi-Registry i create a new flow (UUID=B) and I use above command to sync my 3 versions form FlowA to FlowB. Now on some NiFi using that target NiFi-Registry imports that FlowB on to the canvas. Now back at source, someone created versions 4, 5, and 6 for FlowA. On that target NiFi-Registry, someone commits version 4. Version 4 for FlowA is not the same edits as version 4 in FlowB. Now i use same toolkit command above to again sync the Flow versions from FlowA to FlowB. The command sees that FlowB contains versions 1-4, so flow versions 5 and 6 only are exported and imported to FlowB. FlowB version 4 will still not match FlowA version 4. The NiFi using FlowB will not reflect that newer versions exist and when they change versions, they will lose the changes made in FlowB version 4. The target NiFi-Registry is properly recording the new flow version number and timestamp as each new flow version is added to FlowB (As each new flow version is added the flow version is incremented and timestamp of that import is recorded). As another example, you can export FlowA version 4 and import that single flow version into FlowB and it will take the next incremental flow version number of FlowB which could be version 1 if you just created FlowB. So using the ./cli/sh registry must be done with great care. Moving over flows initially is great, but trying to use this sync to periodically re-sync flows is likely not going to do exactly what is expected except under very controlled environments. If your intent is to move all NiFi-Registry flows from one NiFi-Registry to another and want to preserve everything for this one time move, it is probably better to copy over the Metadata Database [1] and Flow Persistence Provider from the source NiFi-Registry to the target NiFi-Registry [1] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#metadata-database [2] https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#persistence-providers If you found this assisted you with your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
09:23 AM
@Amn Where did you define the NiFi Attribute "${itemType}" (FlowFile Attribute on a source FlowFile driving execution of GetMongo processor or NiFi variable registry)? NiFi is also case sensitive, so for example "ItemType" and "itemType" would be considered different variables. If you found this helpful to yoru query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
09:14 AM
@Prajeesh10 If you are trying to split your source CSV in to two different FlowFile before converting each to a JSON, you could use the "SplitContent" [1] processor. [1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.13.2/org.apache.nifi.processors.standard.SplitContent/index.html If you found this useful, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-28-2021
08:23 AM
@Rupesh_Raghani I just want to make sure that when we are both talking about NiFi "Templates" we are talking about the same thing. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates When you upload a NiFi template (xml file) to NiFi via the UI (Does not matter which node in a NiFi cluster you are accessing), that template will get uploaded and replicated to all nodes in the cluster. So all nodes will have that template in each nodes JVM heap and written to the flow.xml.gz on disk. This statement is not clear to me: "So if I have templates on every node in cluster then will the nifi load templates on each node as per templates available on their node or a parent node will load all templates from other cluster". What do you mean by "if I have templates on every node"? In a NiFi cluster, every node must have the same flow.xml.gz. If the flow loaded in to heap memory does not match between nodes, the nodes no matching the elected cluster flow will be disconnected from the cluster. Each node, while it has its own local copy of the flow, runs the exact same flow. Hope this helps, Matt
... View more
05-27-2021
08:48 AM
@Rupesh_Raghani NiFI templates are uploaded to the process group selected. Only users who have been granted view and modify on that PG will be able to instantiate those templates on to the NiFi canvas. NiFi operates within a single JVM, so there are not separate per user canvases. User not authorized on a specific NiFi Process Group (PG) will only see a blank representation for that component on the canvas (non-authorized users can not see name, description, configuration details, etc...). Non-authorized users can still see metrics presented on components and queue counts on connections only. The reason NiFi still renders all components is because everything is executing within a single JVM. - One users flows can impact another team (Some flow with huge backlog of queued data, high volume, etc can impact resource availability to other teams/users. - Stats on information bar is representative of entire canvas (all dataflows), so it is still important for all teams/users to be able to identify areas of concern even if is not their dataflows and they have no authorizations allowing them to do anything about it. But they could alert other teams/users. - Typical use case is team/user 1 has a PG and then team/user 2 has a different authorized PG. If NiFi did not render both PG to both users, nothing prevents users from building flows on top of one another. Then assume we have team/user 3 that is authorized for both above team/users PGs. This becomes unreadable useable by that third user. If you found this helped with your query, please take a moment to login and click "Accept" on this solution. Thank you, Matt
... View more
05-27-2021
08:24 AM
@Rupesh_Raghani Since templates reside in NiFi heap, they should only be uploaded to the NiFi for the purpose of instantiating that template to the canvas. Once instantiate on to the canvas, the template should be deleted from the NiFi, so it is no longer holding that memory space. In addition to uploaded templates consuming heap memory space, so does everything build on the canvas (including controller services, and reporting tasks.). Additionally the metrics for each component also reside within heap memory space. Additionally, all FlowFiles queued (except large queues resulting in swap files) will also reside in the NiFi JVM heap memory space. How much heap each FlowFile consumes is driven by the number and size of the FlowFile attributes on each FlowFile (FlowFile content does not reside in heap memory except when a processor needs to do so to perform its task and not all processors need to touch the content at all and other may also read it without needing to hold it in heap if it is streaming it somewhere else.). The impact on heap various based on what components are being used and how many. If your flow grow extremely large, it may be a case of breaking those flows to be managed by Multiple NiFi clusters. NiFi flow templates will become a deprecated capability in favor of NiFi-Registry. You can version control your flows in to NiFi-Registry. All NiFi's connected to this NiFi-Registry can then load Flows from NiFi-Registry to the canvas (one or more times). I am not sure what you are looking for with regards to "How does it manage individual processors in memory?" All processors residing within the canvas and within templates will reside in the JVM heap memory space. If you find this helps with yoru query, please take a moment to login and click "ACCEPT" on this solution. Thank you, Matt
... View more
05-24-2021
06:24 AM
@Chakkara The more detail you can provide, the better we may be able to assist. Are you using custom processors? NiFi does not come with a ReplaceAttribute processor. I am assuming your flow may actually be using: GetFile (deprecated processor in favor of the listFile and FetchFile processors) --> UpdateAttribute --> ExtractText --> RouteOnAttribute --> putSQL Can you share the configuration you are currently using on the above processors, so we can see what you are doing now? Perhaps you only need a few tweaks to your current design. Thanks, Matt
... View more
05-21-2021
01:46 PM
@SAMSAL Sounds like you are very close... The 403 has nothing to do with authentication, but rather authorization. This eludes to the fact that the client was able to be properly trusted through the mutual TLS handshake, but when that resulting client string was checked against the endpoint policy being checked it was not present, thus resulting in a 403 response. The DN is the DistinquishedName for your certificate. You can use the below command to get the verbose details on the certs added to a keystore: keytool -v -list -keystore <keystore.jks or truststore.jks> You will see that each certificate has either type "PrivateKeyEntry" or "TrustedCertEntry" For each certificate you will see an "Owner" and an "Issuer" For each of those it will show the complete DN which would look something like: CN=<some string>, OU=<some string>, DC=<some string> There are numerous parts that can be part (CN, OU, O, ST, DC, etc.) of a DN and various lengths The "Owner" DN for the PrivateKeyEntry is what is used as the client/user authenticated string after the successful mutual TLS handshake. The nifi.properties file has optional ideNtity.mapping properties that can be configured to trim and manipulated these identity strings (for example pulling our on the string from the CN). With nifi.remote.input.secure= set to false, FlowFiles send over S2S will not be over a TLS encrypted connection. The fact that you say it works when you have this set to false and you can still successfully obtain S2S details from the configured https:// secured NiFi Destination URL tells me that authentication is correct for "Retrieve site-to-site details" NiFi policy. That only leaves having incorrect policy setup for your remote input port. You should open a command prompt on each node and "tail -F /<path to>/nifi-user.log". Then enable your controller services and check the logs being tailed for the authorization exception. It should clearly show you the "string" being checked against the "receive data via site-to-site" endpoint which in the log would look like "/data-transfer/input-ports/<uuid of remote input port>". Also remember that you are NOT authorizing your user to any of these S2S policies, but rather the DN or identity mapped DN string. Really hope this helps you get fully secured here. Matt
... View more