Member since
07-30-2019
3131
Posts
1564
Kudos Received
909
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
102 | 01-09-2025 11:14 AM | |
649 | 01-03-2025 05:59 AM | |
393 | 12-13-2024 10:58 AM | |
419 | 12-05-2024 06:38 AM | |
356 | 11-22-2024 05:50 AM |
01-09-2025
11:14 AM
@Chram Lets backup and first understand the use case you are trying to solve here. From the look of your dataflows it appears you have two FlowFiles that you want to merge via the MergeContent processor. The order in which they are merged is important to you. This sounds like a use case better solved by using the "Defragment" merge strategy in the MergeContent processor rather then trying to use EnforceOrder processor. Is there more to your use case which necessitates the use of enforceOrder? As far as your dataflow goes, I am having trouble reproducing the issue you described. I see my priority 2 file sitting in wait only until my matching priority 1 file arrives. The dataflow screenshot you shared also does not show a wait relationship being routed anywhere. Are you using the "retry" + "terminate" checkboxes on the wait relationship instead? If so, what are the retry settings configured? Rather then use "retry" on the wait relationship, try dragging a new connection away from and back to the EnforceOrder processor to create a loop and assign the "Wait" relationship to that looped connection (make sure to also uncheck "retry" on the wait relationship). Does same issue still persist? Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-09-2025
07:28 AM
@askh88 I don't know anything about the "livy processor" you are using, but NiFi processor typically execute against a single FlowFile at a time. So trying to use wait notify to delay FlowFiles reaching the livy processor until you have x number of FlowFiles of X total size range would likely not make much difference in controlling number of spark connections. The question here is if it is possible to merge multiple FlowFiles in to one FlowFile that can be passed to your livy processor. I don't know anything about structure of your data and if merge is possible via a mergeContent or MergeRecord processor. But if that Merging of FlowFiles is possible, that is the better route to take here. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-07-2025
09:29 AM
@ShellyIsGolden 500k+ files is a lot to list and the lookup on subsequent runs to look for new files. A few questions first: How is your listSFTP processor scheduling configured? With the Initial listing, how long does it take to output he 500K+ FlowFiles from time processor is started? When files are added to the SFTP server, are they added using a dot rename method? Is the last modified timestamp being updated on the files as they are being written to the SFTP server? So the processor when executed for the initial time will list all files regardless of the configured "Entity Tracking Time Window" set value. Subsequent executions will only list files with and last modified timestamp within the configured "Entity Tracking Time Window" set value. So accurate last modified timestamps are important. With initial listing of a new processor (or copy of existing processor) there is no step to check list files against the cache entries to see if file has never been listed before or if a listed file has changed in size since last listed. This lookup and comparison does happen on subsequent runs and can use considerable heap. Do you see any OutOf Memory (OOM) exceptions in your NiFi app logs? Depending on how often the processor executes, consider reducing the configured "Entity Tracking Time Window" value so fewer files are listed in the subsequent executions that need to be looked up. Set it to what is needed with a small buffer between each processor execution. Considering that it sounds you have yoru processor scheduled to execute every 1 minute, maybe try setting this to 30 minutes instead to see what impact it has. When you see the issue, does the processor show an active thread in the upper right corner that never seems to go away? When the issue appears, rather then copy the processor, what happens if you simply stop the processor (make sure all active threads complete, shows no active threads number in upper right corner of processor) and then just restart it? In the latest version of Apache NiFi, a "Remote Poll Batch Size" property (defaults to 5000) was added to the listSFTP processor which may help here considering the tremendous amount files being listed in your case. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-07-2025
07:14 AM
@Bern I suggest starting a new community question with the full error stack trace you are seeing. Your exception seems different then the one discussed in this community question: Failure is due to java.lang.IllegalArgumentException: A HostProvider may not be empty!: java.lang.IllegalArgumentException: A HostProvider may not be empty! You exception is: Failure is due to
org.apache.nifi.processor.exception.TerminatedTaskException: A few observations and things you may want to provide details around in your new community post: 1. The version of Apache NiFi you are using was released ~6 years ago. You should really consider upgrading to take advantage of lost of bug fixes, performance improvements, new features, and security CVEs addressed. The latest release in the 1.x branch is 1.28 (which is final release of 1.x branch). 2. Your screenshot shows over 250,000 queued FlowFiles (25.75 GB) and 1.373 running processors components. What do you have set as your Max Rimer Driven Tread count? 3. Any other WARN or ERROR messages in your NiFi logs? Any Out of Memory (OOM) reported? 4. It does not make sense why you are load-balancing in so many connections? Thank you, Matt
... View more
01-07-2025
05:57 AM
@ravi_tadepally A secured NiFi is always going to require successful authentication and authorization. I assume you are fetching a token because you have configured your secured NiFi to use OIDC based user authentication. But keep in mind that a secured NiFi will always support Mutual TLS based authentication no matter what additional authentication methods have been configured. For Rest-api interactions it is often easier to generate a clientAuth certificate that is trusted by your NiFi's truststore and use that instead for authentication. With mutual TLS based authentication there is no need to fetch any token. You simply include the clientAuth certificate in every rest-api call. You could even handle this task via a NiFi dataflow that utilizes the invokeHTTP processor (configured with a SSL Context Service. Could even just use NiFi's keystore and truststore) to make the rest-api call to fetch Prometheus data and then through that dataflow send it to the desired endpoint. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-03-2025
07:37 AM
1 Kudo
@spiker What is currently written in your users.xml and authorizations.xml files in the Nifi conf directory? How many nodes in yoru NiFi cluster? What Apache NiFi version are you using? Thank you, Matt
... View more
01-03-2025
07:16 AM
@spiker With the edits to the authorizers.xml and verified newly generated users.xml and authorizations.xml files, are you still seeing the untrusted proxy WARN in the logs? What do you see now in the logs? Thanks, Matt
... View more
01-03-2025
05:59 AM
@spiker Let's start by focusing on the following two shared log lines: 2025-01-02 10:29:05,167 INFO [NiFi Web Server-52] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 172.24.0.3 [<oncloudtemuser@sossourabh7687gmail.onmicrosoft.com><CN=172.24.0.3, OU=NIFI, O=NIFI, L=HYDRABAD, ST=TELANGANA, C=IN>] GET https://172.24.0.3:8443/nifi-api/flow/current-user
2025-01-02 10:29:05,173 WARN [NiFi Web Server-52] o.a.n.w.s.NiFiAuthenticationFilter Authentication Failed 172.24.0.3 GET https://172.24.0.3:8443/nifi-api/flow/current-user [Untrusted proxy CN=172.24.0.3, OU=NIFI, O=NIFI, L=HYDRABAD, ST=TELANGANA, C=IN] In the first log line we see an authenticated user identity followed by the authenticated node identity for the node receiving access request: <oncloudtemuser@sossourabh7687gmail.onmicrosoft.com><CN=172.24.0.3, OU=NIFI, O=NIFI, L=HYDRABAD, ST=TELANGANA, C=IN> In a NiFi cluster setup nodes will proxy all requests on behalf of the authenticated user to the currently elected NiFi cluster coordinator. This means that all nodes in a NiFi cluster must be authorized to proxy user requests. Establishing the minimum required authorizations needed in a new NiFi setup is handle by the authorizers.xml. In your case, you are using the file-access-policy-provider: <accessPolicyProvider>
<identifier>file-access-policy-provider</identifier>
<class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
<property name="User Group Provider">composite-configurable-user-group-provider</property>
<property name="Authorizations File">./conf/authorizations.xml</property>
<property name="Initial Admin Identity">oncloudtemuser@sossourabh7687gmail.onmicrosoft.com</property>
<property name="Legacy Authorized Users File"></property>
<property name="Node Identity 1"></property>
</accessPolicyProvider> This provider will generated the authorizations.xml file ONLY if it does not already exists. Once it exists all additional authorizations and modifications are made from within the NiFi UI. If you edit the file-access-policy-provider, you'll need to delete the authorizations.xml on all nodes before restarting yoru NiFi. So we see from above that you have you initial admin user identity defined, but have not defined your node(s) identities via "Node Identity 1", Node Identity 2", etc... Before you can define your node identity in the file-access-policy-provider for seeding the node required authorizations, the same case sensitive identical node identity must be returned by the the "composite-confgurable-user-group-provider". Which means that "CN=172.24.0.3, OU=NIFI, O=NIFI, L=HYDRABAD, ST=TELANGANA, C=IN" must be returned by either the "file-user-group-provider" or the "aad-user-group-provider". I believe the file-user-group-provider is where you expect you node identities to be derived from: <userGroupProvider>
<identifier>file-user-group-provider</identifier>
<class>org.apache.nifi.authorization.FileUserGroupProvider</class>
<property name="Users File">./conf/users.xml</property>
<property name="Legacy Authorized Users File"></property>
<property name="Initial User Identity 1">CN=172.24.0.3, OU=NIFI, O=NIFI, L=HYDRABAD, ST=TELANGANA, C=IN</property>
<property name="Initial User Identity 2"></property>
</userGroupProvider> Just like the "file-access-policy-provider", the "file-user-group-provider" will ONLY generate the users.xml if it does not already exist. So you will need to delete the users.xml on all your nodes before restarting after editing your authorizers.xml. NOTE: Be mindful of case sensitivity in your user identities. These modifications should get you past your UNTRUSTED PROXY issues when trying to access the NiFi with your authenticated user. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-24-2024
07:02 AM
1 Kudo
@BK84 I would avoid if possible designing dataflows that rely on making rest-api calls to control the processing of data. Any issue that may prevent the success of the rest-api call would have negative impacts on yoru data processing. Based on what you have shared, i'd suggest having your A->B->C dataflow directly connect to your D->E->F dataflow. Since your Python script (C) is responsible for creating the files in the directory from which ListFile (D) checks, why not have your python Script output a FlowFile that contains a list of filenames that it created. A splitContent processor could be used to split that into individual FlowFiles that can be passed directly to a FetchFile so that can be consumed (no need for listFile anymore) for writing to the ElasticSearch. Then lets consider the error handling and how to trigger next run without rest-api. SplitContent->FetchFile->PutElasticSearchJson should be placed in a child process group. That process group should be configured to use FlowFile Concurrency (Single FlowFile per Node) and an Outbound Policy (Batch Output). This means that only 1 FlowFile (FlowFile produced by Python Script that contains list of all files to be fetched) will be allowed to enter this child process group at a time. PutElasticSearchJson has relationships for handling retry, success, failure, and errors. These can be used to handle success as well as report (possibly using PutEmail processor) when processing has issues. On the Success relationship path you could use ModifyBytes to zero out the content and then MergeContent to merge the split FlowFiles back into one FlowFile using "Defragment" strategy. Add a max bin age to force failure of a bin after x amount of time if it does not have all its fragments. SplitContent creates all the required FlowFile attributes needed to support the defragment strategy. Assuming all Files were "success" from PutElasticSearchJson processor, the single defragmented File will be output which you send to an output port in that child process group. Once all FlowFiles in the child process are queued at output ports they will allowed to exit the child process group. Assuming a mergeContent "success". you can use this output FlowFile to trigger the next run (hopefully without using rest-api calls). Since you ddi not share how your data gets started in A and B, I can't really suggest anything here. Bottom line is that avoiding using rest-api calls to control your dataflows leads to faster more efficient processing of your data. Allow your dataflow to handle the error handling, But if you do choose to use rest-api calls, the best way to figure them out is to open developer tools in your browser and then manually perform the interactions needed via the NiFi UI. Through the developer tools you can capture (copy as CURL) the call being made in response to your UI action. Now you are likely using username and password to access your NiFi, but this adds more work to do so through NiFi dataflows (for one thing your username and password would be in plain text in the necessary dataflow to get your user token). So you will want to setup a Keystore and Truststore so authentication is handle via a mutualTLS handshake thus avoiding exposed passwords and need to get a user token. The InvokeHTTP processor can be used to make the rest-api calls and can be configured with and SSLContextService. This is a lot of high level information, but hope it has set you on the path to success. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-24-2024
06:00 AM
1 Kudo
@hegdemahendra As far as your issue goes, it would probably be useful to collect a series of thread dumps (at least spaced 5 minutes apart). Then you would be looking for any threads related to the stopping of components to see if they are progressing or hung. Is it stuck on stopping a specific processor or processor class? Do any of the processors that are being stopped have active threads showing for them? Thank you, Matt
... View more