Member since
07-30-2019
3471
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 130 | 06-03-2026 06:06 PM | |
| 450 | 05-06-2026 09:16 AM | |
| 807 | 05-04-2026 05:20 AM | |
| 485 | 05-01-2026 10:15 AM | |
| 614 | 03-23-2026 05:44 AM |
03-24-2020
11:10 AM
@Alexandros Going to ask the simple question first... There are FlowFiles traversing the processors in this newly instantiated flow from your template, correct? --- The next thought would then be around authorizations (assuming your NiFi is secured) 1. Is your user running the provenance query authorized to "view provenance" and "view the data" on the components? If these are set on the process group containing these processor components and not set on the components themselves, the components will inherit the polices from the process group. 2. Is this a NiFi cluster? If so, make sure your NiFi nodes are also authorized to "view provenance" and "view the data". When you authenticate in to NiFi and run a provenance query, that query is replicated to all nodes in your cluster. Those query results are then returned to the node on which the originating request was made. If that node is not authorized to view data returned from other nodes, it will not display. --- Then we need to make sure provenance it still working: While you are seeing provenance events displayed for your existing flow, are those returns recent? If you monitor the contents of your provenance_repository, do you see timestamps updating on the <num>.prov files? Need to make sure provenance has not stopped working for some reason. Also make sure you are using the WriteAheadProvenanceRepository implementation (should be default in 1.11) and not the PersistentProvenanceRepository implementation (configured in nifi.properties file). Hope this helps, Matt
... View more
03-16-2020
05:09 PM
1 Kudo
@Gubbi Depending on which processor is being used to create your FlowFile from you source linux directory, you will likely have an "absolute.path" FlowFile attribute created on the FlowFile. absolute.path = /users/abc/20200312/gtry/ You can pass that FlowFile to an UpdateAttribute processor which can use NiFi Expression Language (EL) to extract the date from that absolute path in to a new FlowFile attribute Add new property (property name becomes new FlowFile attribute): Property: Value: pathDate ${absolute.path:getDelimitedField('4','/')} The resulting FlowFile will have a new attribute: pathDate = 20200312 Now you can use that FlowFile attribute later when writing to your target directory in S3. I assume you would use the putS3Object processor for this? If so, you can configure the "Object Key" property with the following: /Users/datastore/${pathDate}/${filename} NiFi EL will replace ${pathDate} with "20200312" and ${filename} will be replaced with "gyyy.csv". Hope this helps you, Matt
... View more
03-06-2020
10:41 AM
@vikrant_kumar24 You would not configure your python script to write to an XML file on disk NiFi handles the FlowFile creation in the framework. Any data passed by your Python script to STDOUT will be populated into the content of the resulting flowfile passed to the output stream relationship of the ExecuteStreamCommand processor. Your script does not need to have any awareness of what. FlowFile is or how it is created. So you simply have your python script send the XML content to STDOUT and NiFi will take care of putting that content in to the FlowFile that will be produced and routed to the "output.stream" relationship of the processor. You can then use the updateAttribute processor the change the filename associated with that content. Hope this helps, Matt
... View more
03-06-2020
10:28 AM
@anil35759 If you create a NiFi template that includes a NiFi processor that references a controller service, that controller service will be included in the generated template. So if you import and instantiate that template on to the canvas of another NiFi, the controller service will be added as well and be associated with the processor instantiate from that same template. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates So I am not clear on why you need to import the controller service separately via the toolkit. Controller services are not mapped to processors. The association happens from the processor side. The processor is mapped to a specific controller service via the controller services assigned uuid. Keep in mind that anything you can do within the NiFi Ui can also be accomplished using NiFi rest-api endpoints. Using your browsers developer tools to capture the requests as you make them through the NiFi UI is a great way to learn how to interact with NiFi rest-api endpoints. The rest-api endpoints specific to your NiFi release version can be found under help within your install. Here are the Apache NiFi rest-api docs for the latest Apache release version: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Essentially what you need to do here is to update the processor configuration to reference the UUID of whichever controller service you want it to use. curl 'http://<nifi-hostname>:<nifi-port>/nifi-api/processors/b10fd083-0170-1000-0000-00007f7c905f' -X PUT --data-binary '{"component":{"id":"b10fd083-0170-1000-0000-00007f7c905f","name":"ExecuteSQL","config":{"concurrentlySchedulableTaskCount":"1","schedulingPeriod":"0 sec","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","schedulingStrategy":"TIMER_DRIVEN","comments":"","autoTerminatedRelationships":[],"properties":{"Database Connection Pooling Service":"e60cb24c-95c5-3a97-bcb9-9e537006317d"}},"state":"STOPPED"},"revision":{"clientId":"affea95c-0170-1000-988b-73bf756785b3","version":2},"disconnectedNodeAcknowledged":false} So you'll notice from above example, I am updating a processor's (ExecuteSQL) configuration so that the Database Connection Pooling Service is mapped to the UUID of my DBCPConnectionPool controller service's uuid (e60cb24c-95c5-3a97-bcb9-9e537006317d). Note: The "clientId" string can be anything. Hope this helps, Matt
... View more
03-05-2020
10:56 AM
@anil35759 It may be helpful if you can share the exact commands you are performing now to export and import your controller services. Thanks, Matt
... View more
03-05-2020
10:52 AM
@sfishman In the "Replacement value" configuration property just hold the <shift> key and hit enter instead of using "\n". Here is example of what you would see: This will create the line return you are looking for. Hope this helps, Matt
... View more
03-05-2020
10:47 AM
@varun_rathinam Observations from your configuration: 1. You are using "Defragment" merge strategy which tells me that somewhere upstream in your dataflow you are splitting some FlowFile in to fragments and then you are using this processor to merge those fragments back in to the original FlowFile. Correct? When using Defragment you can not use multiple MergeContent processors in series as i mentioned earlier because the defragment strategy is expecting to find all fragments from the fragment count before merging them. 2. When using the defragment strategy it is the fragment.count attribute on the FlowFiles that dictates when the bin should be merged and not the min number of entries. 3. Each FlowFile that has a unique value in the fragment.identifier will be allocated to a different bin. Setting the number of bins to "1" will never work no matter which merge strategy you choose to use. When the MergeContent processor executes it first checks to see if a free bin is available (if not it merges oldest bin or routes oldest bins FlowFiles to failure in case of Defragment to free up a bin), then it looks at the current FlowFiles in the inbound connection at that exact moment in time and starts allocating them to existing bins or new bins. So at a minimum you should always have at least "2" bins. The default is "5" bins. Having multiple bins does not mean that all those available bins will be used. 4. I see you changed Maximum Number of Entries from default 1000 to 100000. Is this because you know each of the FlowFiles you split will produce up to 100,000 FlowFiles? As i mentioned the ALL FlowFiles allocated to bins have their attributes held in heap memory. Adding to that... If you have multiple bins being filled because you have unique fragment.identifiers being defragmented, you could have even more than 100,000 FlowFiles worth of attributes in heap memory. So your NiFi JVM heap memory being set at only 2GB may lead you to hitting Out Of Memory (OOM) conditions with such a dataflow design. Also want to add that where ever you are doing the original splitting of your FlowFile in your dataflow will also have an impact on heap memory because the FlowFile Attributes for every FlowFile being produced during the split process is held in heap memory until every new split FlowFile being produced is committed to a downstream connection. NiFi connections between processors have swapping enabled by default to help reduce heap usage when queues get large, but same does not apply within the internals of a processors execution. As i mentioned before, the MergeContent does not load FlowFile content in heap memory, so the size of your FlowFiles does not impact heap here. So you really want to step back and look at your use case again and ask yourself: "do I really need to split my source FlowFile and merge it back in to the original FlowFile to satisfy my use case?" NiFi has numerous record based processors for working with records avoiding the need to split them in many use cases. Hope this helps, Matt
... View more
03-05-2020
10:21 AM
@domR I see no issues with using a publish and consume design for triggering follow on flows. That provide a slightly more robust setup than using the postHTTP to ListenHTTP example I provided since any NiFi node can consume the trigger file. When using Kafka you will want to make sure you have the same number of partitions as you have consumer in the same consumer group. When you add a consumeKafka processor to the NiFi canvas it is really being added to every node in your cluster and is configured the same on every node. Let's assume you have a 3 node NiFi cluster, that means you have at a minimum 3 consumers in that consumer group so your Kafka topic should have 3 partitions; otherwise, you may see a lot of rebalancing happen. To improve performance even more you can increase the concurrent task on your consumeKafka processor. With a 3 node NiFi cluster and a consumeKafka configured with 2 concurrent tasks, you now have 6 total consumers in the consumer group; therefore, your Kafka topic should have 6 partitions. If a NiFi node goes down Kafka will assign multiple partitions to the same consumer, so no need to worry about messages not being consumed. Hope this information was useful in finding a solution, Matt
... View more
03-04-2020
01:25 PM
@domR Do you find yourself often adding or removing nodes from your NiFi cluster? If not, creating one PostHTTP processor for each of your NiFi nodes would work. As far as querying the connected nodes, sure that can be done via a call to the rest api, but you are going to get json back you are going to parse the connected hostname out of. Plus it is possible that post parsing that info one or more of those nodes disconnects. So the postHTTP would fail and you then need to do that parse processing all over again. This gets expensive in terms of process and performance. This kind of logic is built in to the Remote Process Groups. However, we can assume that the NiFi node that is processing the FlowFile must be up or the FlowFile would not be getting processed. So you could just configure your postHTTP processor to use a dynamic URL based on a NiFi Expression Language statement that just use the hostname of the node on which the FlowFile is already being processed. Example PostHTTP processor configuration: Example ListenHTTP processor configuration: The above allows you to send FlowFiles from a postHTTP to a ListenHTTP on the same node. Note: Understand that you are adding additional read and writes to the content repository and sending data across the network which will add latency versus just routing the same FlowFile continuously within the canvas from one dataflow to another via directly connected dataflows. Hope this solution helps you, Matt
... View more
03-04-2020
01:05 PM
@varun_rathinam Can you please elaborate on "processor drop the file and join with new files"? And also "content_repository backup i limit"? <-- Are you referring to the "nifi.content.repository.archive.max.retention.period" and "nifi.content.repository.archive.max.usage.percentage" configuration settings in the nifi.properties file? Also sharing a screenshot of your current MergeContent processor's configuration along with more details around your use case. What result are you seeing now and what is the desired result? The MergeContent processor takes multiple FlowFiles from same NiFi node and merges the content of those FlowFiles based on the processor's configuration in to one or more new FlowFile's per node. The processor cannot merge FlowFiles residing on different NiFi nodes in an NiFi cluster into one FlowFile. FlowFiles from the inbound connection queue are allocated to bins based on the following configuration properties: Correlation Attribute Name <-- (Optional) when used, only FlowFile with same value in the configured FlowFile attribute will be placed in same bin. Maximum Number of Entries <-- Maximum number of FlowFiles that can be allocated to a single bin before a new bin is used. Maximum Group Size <-- (Optional) Maximum cumulative size of the content that can be allocated to a bin When a "bin" is eligible to be merged is controlled by these configuration properties: Minimum Number of Entries <-- If at end of thread execution (after all FlowFiles from inbound connection have been allocated to one or more bins) the number of FlowFiles allocated to a bin meets this min and meets configured min group size, the FlowFiles in that bin will be merged. Minimum Group Size <-- Same as above Max Bin Age <-- A bin that has not reached or exceeded both above min values will merge once the bin has had FlowFiles in it for this amount of time Maximum number of Bins <-- If FlowFile have been allocated to every bin and another bin is needed, the oldest bin will be forced to merge to free a bin. It is possible that one or both min values are never reached if a Max bin setting is reached first. This means that because of max additional FlowFiles can not be allocated to that bin and the only setting that will force that bin to merge is "Max Bin Age" or you run out of free bins. As far as bin Max values, NiFi really does not care about content size as it streams the merged FlowFiles content in to a new FlowFile and does not hold that content in memory. NiFi can experience Out OF Memory (OOM) conditions if the number of FlowFiles Max is set too high since all the attributes for every FlowFile currently allocated to bin(s) is held in heap memory. NiFi's allocated heap memory is set in the nifi.properties configuration file. So a Max number of entries should be limited to 10000 (but this varies based on memory availability and number and size of attributes on your FlowFiles. You can use multiple MergeContent processors in series (one after another) to merge multiple merged FlowFiles in to even larger merged FlowFiles if desired. Hope this helps with understanding the MergeContent processor, Matt
... View more