Member since
07-30-2019
3421
Posts
1624
Kudos Received
1010
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 58 | 01-13-2026 11:14 AM | |
| 198 | 01-09-2026 06:58 AM | |
| 518 | 12-17-2025 05:55 AM | |
| 579 | 12-15-2025 01:29 PM | |
| 563 | 12-15-2025 06:50 AM |
04-08-2025
05:35 AM
@sha257 It appears from the authorizations.xml file you shared, your issue is also affected by your NiFi node(s) not being authorized to /proxy (Read, Write, Delete) in NiFi-Registry that I mentioned in my earlier response. I see only user identifier: 71b266f5-7764-3ff5-a812-80112278b50c Which from your users.xml is your "abc123" user identity. So when the NiFi node attempts to proxy a request on behalf of the user identity authenticated in NiFi, the NiFi node's clientAuth certificate is passed in the connection to NiFi-Registry to authenticate the node and the is checked fro proxy. If that mutual TLS exchange is not successful. the node connects as anonymous (which only has read on public buckets). There are multiple layers here. When you setup the NiFi Flow Registry in NiFi, It gives you the option to define a StandardSSL Context Service. The keystore and truststore defined are used in that mutualTLS exchange. When you don't define a StandardSSLContext Service, NiFi will default t using the keystore and truststore defined in the nifi.properties file. Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-08-2025
05:18 AM
@nifier The NiFi EL you are using is valid and works. So this raise the question as to where you are trying to use it? What version of Apache NiFi are you using? Which NiFi processor are you using this in? Which Processor property are you using it in? NOTE: Make sure the processor property supports NiFi EL. You can't use EL in every property. I validated your EL using the UpdateAttribute processor. I do have another question about your ifElse: Why are you appending a filename to path? A more typical approach would be t simply use ${extracted.path}/${filename} in the processor that writes file out to destination. If extracted.path is empty or does not exist it returns nothing. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-07-2025
06:14 AM
@Fanxxx You have what sounds like a rather complex use case here with numerous outputs, timing controls, and routing requirements. Control Rate is very basic in nature (allow X FlowFiles to pass every X amount of time) which depending on volume of FlowFiles can lead to a backlog that ends up resulting in most request to fail after your 5 second requirement (including new that end up delayed more the 5 seconds because they are still queued up behind other FlowFiles behind yoru ControlRate.) Cloudera offers professional Services to its licensed users that can help design and implement complex use cases. Assisting you through the community would require considerable back and forth and exchange of information to include test files, etc. Thank you, Matt
... View more
04-07-2025
05:57 AM
@nifier Unfortunately not. When the StandardPGPPublicKeyService Controller service is enabled, it loads the Keyring into heap memory. Only stopping will allow you to edit the "Keyring" or allow it to load an updated keyring from the "Keyring File". Likewise, and component that has been configured to use this StandardPGPPublicKeyService must be stopped whenever the Controller Service is disabled because a dependency exists between the two components and thus the components are no longer "Valid" and able to run when the controllers service is disabled. Stopping and Starting the Controller Service gives you the option to start all the dependent processors using it at same time. You could raise an Apache NiFi Jira (https://issues.apache.org/jira/browse/NIFI) for a new feature request around the StandardPGPPublicKeyService Controller Service perhaps asking for ability to update a KeyRingFile while enabled and a specify a re-read interval for reading the KeyRingFile. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-07-2025
05:39 AM
@sha257 Your bucket having "Make Publicly Visible" checked is why you can load flows from this bucket on to your NiFi canvas. I see from that same screenshot you have not defined any policies on that bucket yet: Refer to NiFi-Registry Access Policies to understand more about the policies setup on buckets and globally within NiFi-Registry. The fact that this bucket is marked as publicly visible ONLY gives all users/clients "READ". So your user is still not allowed to Write to the bucket which would be required to commit a new version of a flow which you are trying to do. I also see that you gave your user all the global Special Privilege Polices. A NiFi user would never "Proxy Requests". The NiFi nodes/instances themselves will proxy requests to NiFi-Registry on-behalf of the NiFi user identity authenticated in NiFi that is attempting to facilitate any version control action in NiFi directed at this NiFi-Registry client. So make sure your NiFi nodes are properly added to "Can proxy Requests" Special Privilege Policy. The next question is when you log into your NiFi as your user and log into your NiFi-Registry as the same user, is the EXACT same user identity string displayed in the upper right corner of the UI? If they are different in any way (including case sensitivity), then they would be treated as different users by each service. If different, you'll need to make adjustments to your configurations so that they are the same. Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-04-2025
01:43 PM
@sha257 You should always share your NiFi and NiFi-Registry versions when asking questions to help narrow focus. From the exception you shared it shows a 401 unauthorized exception as reason why you can not commit your local changes as a new version in NiFi-Registry bucket This indicates that your NiFi user only has read access on that bucket allowing that user to import the dataflow from NiFi-Registry to NiFi but not authorized to commit (write) new version back to NiFi-Registry for that dataflow. If the bucket was added as a publicly visible bucket, all users can import flows from that bucket to the NiFi canvas. A publicly visible bucket allows everyone including anonymous user read access. A public bucket still requires proper authorizations are setup in order to Write (start version control or commit new version of an already version controlled flow) to it. When the "make publicly visible" option os NOT checked on the bucket. Explicit policies must exist for any user to Read (download flows from bucket), Write (start version control or commit new version of a flow to the bucket), or DELETE (delete a flow from the bucket). Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-04-2025
10:18 AM
@MariaE It sounds like from the shared exception the data coming from your source json did not match the schema returned from "zv60-u-ab-...". Looks like the data had a null value in the channel field but the schema does not support null values. So either an issue with the source data or the schema needs to be modified to support null values. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-02-2025
11:55 AM
@AlokKumar The simple answer is it is possible. both the ListenHTTP and HandleHTTPRequest processors create an embedded Jetty server with a default 200 threads for handling inbound http requests. The Concurrent Tasks are used to covert the http requests into NiFi FlowFiles. The HandleHTTPRequest processor also requires a HandleHTTPResponse processor. The HandleHTTPRequest processor also has a configurable "Container Queue Size" (default 50). I would recommend using the HandleHTTPRequest and HandleHTTPResponse processors. I would increase the "Container Queue Size" to 100. I would increase the Concurrent tasks to maybe 5 and monitor your throughput (Avoid just setting some high value here. Should only increment slowly as you monitor cpu load average and throughput performance). Keep in mind that concurrent tasks are requesting threads from the NIFi Max Timer Driven thread Pool (default: 10) that you would also need to increase (typical starting value is 2 to 4 times the number of CPU cores on your NiFi node). I say above is the "simple" answer since this is probably not the only dataflow running on your NiFi. Every component has dome impact in CPU load average (some significant and other almost non-measurable). So the key here is monitor CPU load average, disk I/o, and network performance and adjust your NiFi component concurrent tasks incrementally keeping in this in mind to maximize your CPU usage to maximize your throughput. If you exhaust your available CPU or Disk I/O, those may limit your capabilities more then NiFi processors do. This is a task of tuning and monitoring and tuning again. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-02-2025
04:39 AM
@Rothmny Based on what you have shared, you should be able to accomplish this with GenerateFlowFile and ExecuteStreamCommand processors. The GenerateFlowFile processor would be configured to generate a 0 byte FlowFile and scheduled using Cron so that it creates a FlowFile on your required schedule that will then trigger the execution of the ExecuteStreamCommand processor that will execute your python script. For calling rest-api endpoints via NiFi dataflows, you could use the InvokeHTTP processor. Since the InvokeHTTP processor does not require and inbound connection, you could just have it as first processor in a dataflows scheduled with cron as well. Or, if the invokeHTTP is called after your script successfully executes you could trigger with the FlowFile that exited the the ExecuteStreamCommandProcessor. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
03-24-2025
12:20 PM
@MarinaM Suggested reading: https://community.cloudera.com/t5/Community-Articles/How-to-address-JVM-OutOfMemory-errors-in-NiFi/ta-p/244431 I'll start with this observation you shared: "My current configuration that cause the flow to go the original" The Merge processors will always send all input FlowFiles to the "Original" relationship when a bin is merged. The "Merged" relationship is where he newly produced FlowFiles will be output with the contents of the merged records. Let break down your two different configurations to understand what each is doing: - Here you are parsing inbound FlowFiles and allocating the content to a bin (each bin will contain only "like" records. A "like" record/FlowFile is one that has the exact same schema as another. At each scheduled execution a thread is requested that reads from the inbound connection queue and allocates FlowFiles to one or more bins. Judging by your screenshots, it appears you have numerous concurrent tasks configured on this processor. Here is where multiple different scenarios can happen: You have multiple concurrent threads processing inbound FlowFiles and allocating FlowFiles to bin(s). After a FlowFiles records are allocated to a bin, the processor checks to see if a bin is eligible for merge (In above case, a bin is merged if it has at least 10,000 FlowFiles in it AND the bin size is at least 20 MB.) So your merged records could have as few as 10,000 records. It is hard to tell from your screenshots what the average size of each record is. If 5,000 FlowFiles where to add up to 35MB of content, then that bin would not be able to accept anymore records; however, the min entries was not satisfied. So that bin just sits since you have no max bin age configured. New records, even if they are "like" records, would start being allocated in another bin. Eventually, there will be no more available bins to allocate to, so the processor will force merge the oldest bin to free a bin to allocate more records to. You have many "unlike" records (more then 5 unique schemas). So quickly you exhaust you available bins and the oldest bin is force merge with very few or even one record in it. I also don't know the average number of records in each inbound FlowFile. Your input FlowFile already has a size of 20 MB, so it is only FlowFile allocated to a bin and then merge when bin is forced output FlowFile contains same number of records. Now looking at alternate configuration: - Here you have increased min num records, min bin size, max bin size, and num of bins. Things that can happen here: With 50 bins and a min num FlowFiles of 1,000,000, there is potential for heap memory issues since that large number of binned FlowFiles are all held in your NiFi JVM heap memory. I'd suggest checking your logs for any OOM exceptions with this configuration. Assuming no memory issues, you likely have less then 50 unique schemas and each bin has still not satisfies both min values (1,000,000 FlowFiles AND 50 MB content). Thus bin just sit not getting merged still waiting for more FlowFiles to add to a bin to merge. Try setting a max bin age to force bins to merge after that amount of time to see what. you get. My guess here is that your smaller configuration is working: Inbound FlowFiles that fail to merge go to failure relationship. Since you did not mention anything about FlowFiles routing to the "failure" relationship and the fact that we see FlowFiles on the "original" relationship, tells me that bis are being merged. If you have a lot of unique schemas, it is likely some bins may getting force merged to free a bin, so output looks no different then input FlowFile. Just looking at your "original" relationship containing 350,981 FlowFiles and 91.25 MB, your average FlowFile size is only ~275 bytes each. Are you saying nothing ever goes to "merged" relationship? I expect there would be considering all these FlowFiles in "Original" relationship. Hopefully this helps you understand how this processor works and helps you take a closer look at your inbound FlowFiles and their schemas to see if they are each unique. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more