Member since
03-26-2024
26
Posts
18
Kudos Received
0
Solutions
11-20-2024
11:28 PM
1 Kudo
@s198, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future. However, if you still have concerns, please provide the information that @MattWho has requested.
... View more
08-09-2024
02:31 AM
1 Kudo
@s198 wrote: My requirement is to retrieve the total number of files in a given HDFS directory and based on the number of files proceed with the downstream flow Health Insurance Market I cannot use the ListHDFS processor as it does not allow inbound connections. The GetHDFSFileInfo processor generates flowfiles for each HDFS file, causing all downstream processors to execute the same number of times. I have observed that we can use ExecuteStreamCommand to invoke a script and execute HDFS commands to get the number of files. I would like to know if we can obtain the count without using a script? Or if there is any other option available besides the above. Hello, To retrieve the total number of files in a given HDFS directory without using a script, you can use the ExecuteStreamCommand processor in Apache NiFi to run HDFS commands directly. However, if you prefer not to use scripts, you can leverage the ExecuteScript processor with a simple Groovy script to achieve this. Here’s how you can do it using the ExecuteScript processor: Add the ExecuteScript Processor: Drag and drop the ExecuteScript processor onto your NiFi canvas. Configure the ExecuteScript Processor: Set the Script Engine to Groovy. In the Script Body, use the following Groovy script to count the files in the HDFS directory: import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if (!flowFile) return
def hdfsDir = '/path/to/hdfs/directory'
def command = "hdfs dfs -count ${hdfsDir}"
def process = command.execute()
process.waitFor()
def output = process.in.text
def fileCount = output.split()[1] // Assuming the second column is the file count
flowFile = session.putAttribute(flowFile, 'file.count', fileCount)
session.transfer(flowFile, REL_SUCCESS) Set the HDFS Directory Path: Replace /path/to/hdfs/directory with the actual path to your HDFS directory. Connect the Processor: Connect the ExecuteScript processor to the downstream processors that need the file count. Run the Flow: Start the flow and the ExecuteScript processor will count the files in the specified HDFS directory and add the count as an attribute to the flow file. This approach avoids the need for a separate script file and keeps everything within NiFi. The ExecuteScript processor runs the HDFS command and extracts the file count, which can then be used in your downstream processors. Hope this will help you. Best regards, florence0239
... View more
04-26-2024
06:02 AM
@s198 Back Pressure thresholds are configured on NiFi connections between processors. There are two types of back pressure thresholds 1. Object Threshold - Back pressure is applied once the Number of FlowFiles reaches or exceeds the setting (default is 10,000 FlowFiles). Applied per node and not across all nodes in a NiFi cluster. 2. Size Threshold - Back pressure is applied once the total data size of queued FlowFiles reaches or exceeds the setting (default is 1 GB). Applied per node and not across all nodes in a NiFi cluster. When Back pressure is being applied on a connection, it prevents the immediate processor that feeds data into that connection form being scheduled to execute until the back pressure is no longer being applied. Since back pressure is a soft limit, this explains you two different scenarios: 1. 20 FlowFiles being transferred to connection feeding your mergeContent processor. Initially that connection is empty so no back pressure is applied. The preceding processor that starts adding FlowFiles to that connection until the "Size Threshold" of 1 GB was reached and thus back pressure is then applied preventing the preceding processor from being scheduled and processing the remaining 6 files. The max bin age set on your mergeContent processor then forces the bin containing the first 14 FlowFiles to merge after 5 minutes thus removing the back pressure that allowed nect 6 files to be processed by upstream processor. 2. The connection between the FetchHDFS and PutSFTP processor has no back pressure being applied (neither object threshold or size threshold has been reached or exceeded), so the FetchHDFS is scheduled to execute. The execution resulted in a single FlowFile larger then the 1 GB size threshold, so back pressure would be applied as soon as that 100 GB file was queued. As soon as the putSFTP successfully executed and moved the FlowFile to one of it's downstream relationships, the FetchHDFS would have been allowed to get scheduled again. There are also processor that do execute on batches of files in a single execution. The list and split based processors like listFile and splitContent are good examples. It is possible that the listFile processor performs a listing execution containing in excess of 10,000 object threshold. Since no back pressure is being applied that execution will be successful and list create all 10,000+ FlowFiles that get transferred to the downstream connection. Back pressure will then be applied until the number of FlowFiles drops back below the threshold. That means as soon as it drops to 9,999 back pressure would be lifted and the listFile processor would be allowed to execute. In your mergeContent example you did the proper edit to object size threshold to allow more FlowFiles to queue in the upstream connection to your mergeContent. If you left the downstream connection containing the "merged" relationship with default size threshold, back pressure would have been applied as soon as the merged FlowFile was added to the connection since its merged size exceeded the 1 GB default size threshold. PRO TIP: You mentioned that your daily merge size may vary from 10 GB to 300GB for your mergeContent. How to handle this in the most efficient way depends really on the number of FlowFiles and no so much on the size of the FlowFiles. Only thing to keep in in mind with size thresholds is the content_repository size limitations. The total disk usage by the content repository is not equal to the size of the actively queued FlowFiles on the canvas due to the fact the content is immutable once created and how NiFi stores FlowFile's content in claims. NiFi holds FlowFile attributes/metadata in NiFi's heap memory for better performance (swapping thresholds exist to help prevent Out of Memory issues but impact performance when swapping is happening). NiFi sets object threshold at 10,000 because swapping does not happen at that default size. When merging batches of FlowFiles in very large number you can get better performance from two MergeContent processors in series instead of just one. To help you understand above more, I recommend reading the following two articles: https://community.cloudera.com/t5/Community-Articles/Dissecting-the-NiFi-quot-connection-quot-Heap-usage-and/ta-p/248166 https://community.cloudera.com/t5/Community-Articles/Understanding-how-NiFi-s-Content-Repository-Archiving-works/ta-p/249418 Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-23-2024
07:56 AM
Thank you, @MattWho for providing timely responses and quick solutions to the queries. You are really helping the community grow. Hats off to you. Appreciate it
... View more
04-09-2024
04:31 AM
1 Kudo
Hi @MattWho 1) Initially, I faced the "NiFi PutSFTP failed to rename dot file issue" only when the child processor was configured with "Outbound Policy = Batch Output". It worked without the child processor group. 2) I modified the PutSFTP failure retry attempt to 3, and it fixed the issue. 3) Later, I introduced a RouteOnAttribute after the FetchHDFS processor for some internal logic implementation, and the PutSFTP error started again. 4) This time, I updated the "Run Schedule" of the PutSFTP processor from 0Sec to 3 Sec, and it again fixed the issue. 5) I have a requirement to transfer stats of each file (with file name, row count, file size) etc. So, I introduced one more PutSFTP processor, and the issue popped up again. 6) Finally, I made the following changes to both of my PutSFTP processors: a) Added PutSFTP failure retry attempt to 3. b) Modified the "Run Schedule" of the first PutSFTP Processor to "7 Sec". c) Modified the "Run Schedule" of the second PutSFTP Processor to "10 Sec". Now it is working fine. Are we getting this issue because of 20 flowfiles processing at a time ? Could you please suggest if this is the right way to fix the "NiFi PutSFTP failed to rename dot file issue"?
... View more
04-03-2024
09:30 AM
Hi @s198, You do not need to have hadoop file system or datanode role on the remote server. You just need to set up some hdfs gateway on the remote server and pull it using distcp. If you are using HDP or CDP, you can add the remote server as a gateway and perform distcp in the remote server. Another option is to share one of the directories in the remote server, mount it in hadoop cluster node, and perform distcp to that mounted directory.
... View more
04-01-2024
02:44 AM
1 Kudo
Thanks a lot @MattWho
... View more
03-28-2024
08:30 AM
Thank you @MattWho . You are awesome!
... View more