Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 152 | 12-17-2025 05:55 AM | |
| 213 | 12-15-2025 01:29 PM | |
| 149 | 12-15-2025 06:50 AM | |
| 264 | 12-05-2025 08:25 AM | |
| 443 | 12-03-2025 10:21 AM |
03-06-2020
10:28 AM
@anil35759 If you create a NiFi template that includes a NiFi processor that references a controller service, that controller service will be included in the generated template. So if you import and instantiate that template on to the canvas of another NiFi, the controller service will be added as well and be associated with the processor instantiate from that same template. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates So I am not clear on why you need to import the controller service separately via the toolkit. Controller services are not mapped to processors. The association happens from the processor side. The processor is mapped to a specific controller service via the controller services assigned uuid. Keep in mind that anything you can do within the NiFi Ui can also be accomplished using NiFi rest-api endpoints. Using your browsers developer tools to capture the requests as you make them through the NiFi UI is a great way to learn how to interact with NiFi rest-api endpoints. The rest-api endpoints specific to your NiFi release version can be found under help within your install. Here are the Apache NiFi rest-api docs for the latest Apache release version: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Essentially what you need to do here is to update the processor configuration to reference the UUID of whichever controller service you want it to use. curl 'http://<nifi-hostname>:<nifi-port>/nifi-api/processors/b10fd083-0170-1000-0000-00007f7c905f' -X PUT --data-binary '{"component":{"id":"b10fd083-0170-1000-0000-00007f7c905f","name":"ExecuteSQL","config":{"concurrentlySchedulableTaskCount":"1","schedulingPeriod":"0 sec","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","schedulingStrategy":"TIMER_DRIVEN","comments":"","autoTerminatedRelationships":[],"properties":{"Database Connection Pooling Service":"e60cb24c-95c5-3a97-bcb9-9e537006317d"}},"state":"STOPPED"},"revision":{"clientId":"affea95c-0170-1000-988b-73bf756785b3","version":2},"disconnectedNodeAcknowledged":false} So you'll notice from above example, I am updating a processor's (ExecuteSQL) configuration so that the Database Connection Pooling Service is mapped to the UUID of my DBCPConnectionPool controller service's uuid (e60cb24c-95c5-3a97-bcb9-9e537006317d). Note: The "clientId" string can be anything. Hope this helps, Matt
... View more
03-05-2020
10:56 AM
@anil35759 It may be helpful if you can share the exact commands you are performing now to export and import your controller services. Thanks, Matt
... View more
03-05-2020
10:52 AM
@sfishman In the "Replacement value" configuration property just hold the <shift> key and hit enter instead of using "\n". Here is example of what you would see: This will create the line return you are looking for. Hope this helps, Matt
... View more
03-05-2020
10:47 AM
@varun_rathinam Observations from your configuration: 1. You are using "Defragment" merge strategy which tells me that somewhere upstream in your dataflow you are splitting some FlowFile in to fragments and then you are using this processor to merge those fragments back in to the original FlowFile. Correct? When using Defragment you can not use multiple MergeContent processors in series as i mentioned earlier because the defragment strategy is expecting to find all fragments from the fragment count before merging them. 2. When using the defragment strategy it is the fragment.count attribute on the FlowFiles that dictates when the bin should be merged and not the min number of entries. 3. Each FlowFile that has a unique value in the fragment.identifier will be allocated to a different bin. Setting the number of bins to "1" will never work no matter which merge strategy you choose to use. When the MergeContent processor executes it first checks to see if a free bin is available (if not it merges oldest bin or routes oldest bins FlowFiles to failure in case of Defragment to free up a bin), then it looks at the current FlowFiles in the inbound connection at that exact moment in time and starts allocating them to existing bins or new bins. So at a minimum you should always have at least "2" bins. The default is "5" bins. Having multiple bins does not mean that all those available bins will be used. 4. I see you changed Maximum Number of Entries from default 1000 to 100000. Is this because you know each of the FlowFiles you split will produce up to 100,000 FlowFiles? As i mentioned the ALL FlowFiles allocated to bins have their attributes held in heap memory. Adding to that... If you have multiple bins being filled because you have unique fragment.identifiers being defragmented, you could have even more than 100,000 FlowFiles worth of attributes in heap memory. So your NiFi JVM heap memory being set at only 2GB may lead you to hitting Out Of Memory (OOM) conditions with such a dataflow design. Also want to add that where ever you are doing the original splitting of your FlowFile in your dataflow will also have an impact on heap memory because the FlowFile Attributes for every FlowFile being produced during the split process is held in heap memory until every new split FlowFile being produced is committed to a downstream connection. NiFi connections between processors have swapping enabled by default to help reduce heap usage when queues get large, but same does not apply within the internals of a processors execution. As i mentioned before, the MergeContent does not load FlowFile content in heap memory, so the size of your FlowFiles does not impact heap here. So you really want to step back and look at your use case again and ask yourself: "do I really need to split my source FlowFile and merge it back in to the original FlowFile to satisfy my use case?" NiFi has numerous record based processors for working with records avoiding the need to split them in many use cases. Hope this helps, Matt
... View more
03-05-2020
10:21 AM
@domR I see no issues with using a publish and consume design for triggering follow on flows. That provide a slightly more robust setup than using the postHTTP to ListenHTTP example I provided since any NiFi node can consume the trigger file. When using Kafka you will want to make sure you have the same number of partitions as you have consumer in the same consumer group. When you add a consumeKafka processor to the NiFi canvas it is really being added to every node in your cluster and is configured the same on every node. Let's assume you have a 3 node NiFi cluster, that means you have at a minimum 3 consumers in that consumer group so your Kafka topic should have 3 partitions; otherwise, you may see a lot of rebalancing happen. To improve performance even more you can increase the concurrent task on your consumeKafka processor. With a 3 node NiFi cluster and a consumeKafka configured with 2 concurrent tasks, you now have 6 total consumers in the consumer group; therefore, your Kafka topic should have 6 partitions. If a NiFi node goes down Kafka will assign multiple partitions to the same consumer, so no need to worry about messages not being consumed. Hope this information was useful in finding a solution, Matt
... View more
03-05-2020
10:09 AM
1 Kudo
@LunaLua I know next to nothing about Ceph, but the exception being thrown by the client (ListS3 processor) identifies issue as: Caused by: java.net.SocketException: Software caused connection abort: recv failed This points at the server side as having closed the connection unexpectedly. I would suggest looking at the Ceph logs to see what exception(s) are being thrown on that side around the same time as you see the exception in NiFi. Perhaps that can provide you with more context around what is going wrong here. Hopefully there are other community members who know more about Ceph or maybe have used the Amazon SDK to interface with Ceph who can provide even more insight. Hope this helps you, Matt
... View more
03-04-2020
01:39 PM
@NickH The FetchSFTP processor has multiple different relationships. For your use case of the file really not being there when the FetchSFTP tries to fetch the content, the expected outcome would be that the FlowFile is routed to the "not.found" relationship which you should auto-terminate, If you encountered some sort of communications failure (network issue during Fetch), the FlowFile should have been routed to the "comms.failure" relationship which should be looped back on processor to try again. The FetchSFTP also has a "permission.denied" relationship which you can perhaps handle via dataflow design as well. Perhaps sending an email alert? Hope this helps, Matt
... View more
03-04-2020
01:25 PM
@domR Do you find yourself often adding or removing nodes from your NiFi cluster? If not, creating one PostHTTP processor for each of your NiFi nodes would work. As far as querying the connected nodes, sure that can be done via a call to the rest api, but you are going to get json back you are going to parse the connected hostname out of. Plus it is possible that post parsing that info one or more of those nodes disconnects. So the postHTTP would fail and you then need to do that parse processing all over again. This gets expensive in terms of process and performance. This kind of logic is built in to the Remote Process Groups. However, we can assume that the NiFi node that is processing the FlowFile must be up or the FlowFile would not be getting processed. So you could just configure your postHTTP processor to use a dynamic URL based on a NiFi Expression Language statement that just use the hostname of the node on which the FlowFile is already being processed. Example PostHTTP processor configuration: Example ListenHTTP processor configuration: The above allows you to send FlowFiles from a postHTTP to a ListenHTTP on the same node. Note: Understand that you are adding additional read and writes to the content repository and sending data across the network which will add latency versus just routing the same FlowFile continuously within the canvas from one dataflow to another via directly connected dataflows. Hope this solution helps you, Matt
... View more
03-04-2020
01:05 PM
@varun_rathinam Can you please elaborate on "processor drop the file and join with new files"? And also "content_repository backup i limit"? <-- Are you referring to the "nifi.content.repository.archive.max.retention.period" and "nifi.content.repository.archive.max.usage.percentage" configuration settings in the nifi.properties file? Also sharing a screenshot of your current MergeContent processor's configuration along with more details around your use case. What result are you seeing now and what is the desired result? The MergeContent processor takes multiple FlowFiles from same NiFi node and merges the content of those FlowFiles based on the processor's configuration in to one or more new FlowFile's per node. The processor cannot merge FlowFiles residing on different NiFi nodes in an NiFi cluster into one FlowFile. FlowFiles from the inbound connection queue are allocated to bins based on the following configuration properties: Correlation Attribute Name <-- (Optional) when used, only FlowFile with same value in the configured FlowFile attribute will be placed in same bin. Maximum Number of Entries <-- Maximum number of FlowFiles that can be allocated to a single bin before a new bin is used. Maximum Group Size <-- (Optional) Maximum cumulative size of the content that can be allocated to a bin When a "bin" is eligible to be merged is controlled by these configuration properties: Minimum Number of Entries <-- If at end of thread execution (after all FlowFiles from inbound connection have been allocated to one or more bins) the number of FlowFiles allocated to a bin meets this min and meets configured min group size, the FlowFiles in that bin will be merged. Minimum Group Size <-- Same as above Max Bin Age <-- A bin that has not reached or exceeded both above min values will merge once the bin has had FlowFiles in it for this amount of time Maximum number of Bins <-- If FlowFile have been allocated to every bin and another bin is needed, the oldest bin will be forced to merge to free a bin. It is possible that one or both min values are never reached if a Max bin setting is reached first. This means that because of max additional FlowFiles can not be allocated to that bin and the only setting that will force that bin to merge is "Max Bin Age" or you run out of free bins. As far as bin Max values, NiFi really does not care about content size as it streams the merged FlowFiles content in to a new FlowFile and does not hold that content in memory. NiFi can experience Out OF Memory (OOM) conditions if the number of FlowFiles Max is set too high since all the attributes for every FlowFile currently allocated to bin(s) is held in heap memory. NiFi's allocated heap memory is set in the nifi.properties configuration file. So a Max number of entries should be limited to 10000 (but this varies based on memory availability and number and size of attributes on your FlowFiles. You can use multiple MergeContent processors in series (one after another) to merge multiple merged FlowFiles in to even larger merged FlowFiles if desired. Hope this helps with understanding the MergeContent processor, Matt
... View more
03-04-2020
10:55 AM
@Umakanth The API is exposed out of the box, it is not something you need to enable. Every action you take while performing actions within the UI makes a call to the NiFi rest-api. When learning how to use the rest-api calls, you may find using the developer tools in your browser helpful. Open the developer tools while you are accessing your NiFi UI. Then perform some action and you will see those requests being made by your browser to NiFi. In the below example (Using Chrome browser developer tools), I opened NiFi's summary UI from the global menu in the upper right corner of the UI: You'll notice that several requests were made. I can write click on any one of those request and select "Copy as cURL" to copy the full request to the system clipboard. I can then paste the request in a terminal window and see what the rest-api call returns. You will notice that the curl command that is copied will have numerous additional headers (-H) that are not always necessary depending on the rest-api endpoint being used. Example: curl 'http://<nifi-hostname>:<nifi-port>/nifi-api/flow/process-groups/root/status?recursive=true' Of course you will need to parse the rest-api returns yourself to extract in many cases the specific details/stats you want to monitor. Hope this helps, Matt
... View more