Member since
07-30-2019
3136
Posts
1565
Kudos Received
910
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
62 | 01-16-2025 03:22 PM | |
183 | 01-09-2025 11:14 AM | |
1066 | 01-03-2025 05:59 AM | |
468 | 12-13-2024 10:58 AM | |
540 | 12-05-2024 06:38 AM |
01-16-2025
03:22 PM
@tono425 I assumed all your records were of the same schema. With Merge Record, a bin will consist of potentially many 'like FlowFiles'. In order for two FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader). If a FlowFile is not like other FlowFiles already allocated to a bin, it will be allocated to a different bin. I would still recommend against using min record to 1 since a typical dataflow will be a constant stream of new FlowFiles and the MergeRecord processor will only see those FlowFiles queued at the exact moment of execution. So you can result in smaller then expected number of records in a merge Record with a constantly running dataflow. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-16-2025
05:41 AM
@tono425 When the mergeRecord processor executes it allocates FlowFiles from the inbound connection to bins. at the end of that execution it determines if any of the bins are eligible to be merged. Since you have Minimum number of records set to 1, a bin would merge even if it only had 1 record in it. Understand that Merge processor will not wait for the max setting to be reached. Try setting your "Min num of records" to 20000 and set your "max bin age" to some value like 5 minutes (The max bin age controls how long Merge record will wait for a bin to reach the set mins before forcing the merge with fewer records when using Bin-Packing Algorithm) Also be mindful of the number of FlowFile it takes to make up the 20000 records you are trying to merge since a FlowFile can contain 1 too many records in it. Also keep in mind that if you are running a NiFi Cluster setup, each node can only merge FlowFiles located on the same node. The Merge processor will not merge FlowFiles across nodes. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-16-2025
05:31 AM
@Eslam Welcome to the community. In order to get helpful answers, you'll need to provide more detail around your use case. NiFi provides many processors for connecting to various services on external hosts. You can find the list of default processors available with the Apache NiFi release here: NiFi 1.x release: https://nifi.apache.org/docs/nifi-docs/ NiFi 2.x release: https://nifi.apache.org/components/ At the very basic of level you have processors like: GetSFTP ListSFTP / FetchSFTP But there are also processor for connecting SMB, Splunk, rest-apis, SNMP, FTP, DBs, Kafka, hive, etc. on external servers. Look through the components list in the documentation for Get, List, Fetch, and Query type processors to see if any of them meet your use case needs. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-15-2025
06:25 AM
@itninja Did you configure your NiFi to use HTTPS or is still configured for HTTP? In order for NiFi to support user authentication and authorization it must be secured over HTTPS. Could you share some screenshots so I better understand what you mean by: Manage to create Administrator user with the set user script. Able to login with Administrator. But the anonymous is still enabled. Code created user Administrator is still visible with anonymous user. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-15-2025
05:16 AM
@lexieliu It may be helpful if you can share the complete configuration of your ListSFTP processor including properties and scheduling tabs. Thanks, Matt
... View more
01-09-2025
11:14 AM
@Chram Lets backup and first understand the use case you are trying to solve here. From the look of your dataflows it appears you have two FlowFiles that you want to merge via the MergeContent processor. The order in which they are merged is important to you. This sounds like a use case better solved by using the "Defragment" merge strategy in the MergeContent processor rather then trying to use EnforceOrder processor. Is there more to your use case which necessitates the use of enforceOrder? As far as your dataflow goes, I am having trouble reproducing the issue you described. I see my priority 2 file sitting in wait only until my matching priority 1 file arrives. The dataflow screenshot you shared also does not show a wait relationship being routed anywhere. Are you using the "retry" + "terminate" checkboxes on the wait relationship instead? If so, what are the retry settings configured? Rather then use "retry" on the wait relationship, try dragging a new connection away from and back to the EnforceOrder processor to create a loop and assign the "Wait" relationship to that looped connection (make sure to also uncheck "retry" on the wait relationship). Does same issue still persist? Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-09-2025
07:28 AM
1 Kudo
@askh88 I don't know anything about the "livy processor" you are using, but NiFi processor typically execute against a single FlowFile at a time. So trying to use wait notify to delay FlowFiles reaching the livy processor until you have x number of FlowFiles of X total size range would likely not make much difference in controlling number of spark connections. The question here is if it is possible to merge multiple FlowFiles in to one FlowFile that can be passed to your livy processor. I don't know anything about structure of your data and if merge is possible via a mergeContent or MergeRecord processor. But if that Merging of FlowFiles is possible, that is the better route to take here. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-07-2025
09:29 AM
@ShellyIsGolden 500k+ files is a lot to list and the lookup on subsequent runs to look for new files. A few questions first: How is your listSFTP processor scheduling configured? With the Initial listing, how long does it take to output he 500K+ FlowFiles from time processor is started? When files are added to the SFTP server, are they added using a dot rename method? Is the last modified timestamp being updated on the files as they are being written to the SFTP server? So the processor when executed for the initial time will list all files regardless of the configured "Entity Tracking Time Window" set value. Subsequent executions will only list files with and last modified timestamp within the configured "Entity Tracking Time Window" set value. So accurate last modified timestamps are important. With initial listing of a new processor (or copy of existing processor) there is no step to check list files against the cache entries to see if file has never been listed before or if a listed file has changed in size since last listed. This lookup and comparison does happen on subsequent runs and can use considerable heap. Do you see any OutOf Memory (OOM) exceptions in your NiFi app logs? Depending on how often the processor executes, consider reducing the configured "Entity Tracking Time Window" value so fewer files are listed in the subsequent executions that need to be looked up. Set it to what is needed with a small buffer between each processor execution. Considering that it sounds you have yoru processor scheduled to execute every 1 minute, maybe try setting this to 30 minutes instead to see what impact it has. When you see the issue, does the processor show an active thread in the upper right corner that never seems to go away? When the issue appears, rather then copy the processor, what happens if you simply stop the processor (make sure all active threads complete, shows no active threads number in upper right corner of processor) and then just restart it? In the latest version of Apache NiFi, a "Remote Poll Batch Size" property (defaults to 5000) was added to the listSFTP processor which may help here considering the tremendous amount files being listed in your case. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
01-07-2025
07:14 AM
@Bern I suggest starting a new community question with the full error stack trace you are seeing. Your exception seems different then the one discussed in this community question: Failure is due to java.lang.IllegalArgumentException: A HostProvider may not be empty!: java.lang.IllegalArgumentException: A HostProvider may not be empty! You exception is: Failure is due to
org.apache.nifi.processor.exception.TerminatedTaskException: A few observations and things you may want to provide details around in your new community post: 1. The version of Apache NiFi you are using was released ~6 years ago. You should really consider upgrading to take advantage of lost of bug fixes, performance improvements, new features, and security CVEs addressed. The latest release in the 1.x branch is 1.28 (which is final release of 1.x branch). 2. Your screenshot shows over 250,000 queued FlowFiles (25.75 GB) and 1.373 running processors components. What do you have set as your Max Rimer Driven Tread count? 3. Any other WARN or ERROR messages in your NiFi logs? Any Out of Memory (OOM) reported? 4. It does not make sense why you are load-balancing in so many connections? Thank you, Matt
... View more
01-07-2025
05:57 AM
@ravi_tadepally A secured NiFi is always going to require successful authentication and authorization. I assume you are fetching a token because you have configured your secured NiFi to use OIDC based user authentication. But keep in mind that a secured NiFi will always support Mutual TLS based authentication no matter what additional authentication methods have been configured. For Rest-api interactions it is often easier to generate a clientAuth certificate that is trusted by your NiFi's truststore and use that instead for authentication. With mutual TLS based authentication there is no need to fetch any token. You simply include the clientAuth certificate in every rest-api call. You could even handle this task via a NiFi dataflow that utilizes the invokeHTTP processor (configured with a SSL Context Service. Could even just use NiFi's keystore and truststore) to make the rest-api call to fetch Prometheus data and then through that dataflow send it to the desired endpoint. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more