Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache Nifi passing a folder as an flowfile to python script

avatar
New Contributor

Hi fellas, 

I have a folder structure where the parent folder is the date which contains multiple locations folder which contains the actual txt files.

So the structure is such 

Logfolder/20240515/Location/logs.txt

My nifi setup is to retrieve the date folders inside the parent directory of Logfolder and pass it to executestreamcommand processor which contains my python script which requires the date folders to be sent in as argument along with all the contents inside the datefolder with original directory structure.

But currently the flowfile is sending logs.txt to my executestreamcommand instead of the 20240515 as the flowfile. 

How do I structure such that the date folders as a whole is passed to the python script and not file by file?

I have tried chatgpt but it isn't quite helpful. I appreciate any help on this as it is quite urgent.

1 ACCEPTED SOLUTION

avatar
Super Mentor

@Racketmojster 

So you do want to ingest all the files within a specific parent directory path.  Use case details are important here.  I was not sure if perhaps your script fetched the files when passed a folder date string.

There are certainly challenges in with using MergeContent processor which may require modifications to your script to make it work.

MergeContent provides numerous Merge Formats for how you want this processor to merge the content from multiple source FlowFiles:

  1. The default is binary concatenation which just appends the bytes from one FlowFile to the end of the previous FlowFiles content.  You could specify a delimiter to keep track of filename of each section of bytes and to mark where one file's  bytes start and end.  This would require your script to parse this binary concatenated content and split the files by delimiter and obtain the individual content's filenames from the delimiter.  This is rather messy.
  2. You could use ZIP or TAR formats to merge the files in to a TAR or ZIP files that retains the directory structure and filenames.  This format would require that your script can then untar or unzip the content in order to process the individual files within the bundle.  This is a less messy option.

Once you have decided on the merge format, you need to configure the MergeContent to make sure all the files from within a unique parent directory (unique date directory) are merged with one another.  This is where the "Correlation Attribute Name" setting is used.  All source FlowFiles where the value from the configured FlowFile attribute is identical will be placed in same MergeContent bin. For example, you might use "absolute.path" in this property, but you'll need to make sure this will work in your use case since i don't know what your complete source structure tree looks like.  You could also use an UpdateAttribute processor before MergeContent to create a new FlowFIle attribute with just the extracted date string from absolute.path and then use that new FlowFile attribute as your correlation attribute setting.

Now you need to make sure MergeContent does not merge a bin before all expected FlowFiles have been added to bin.  The way MergeContent processor works is that when executed it reads FlowFile(s) from inbound connection and allocates it/them to a bin.  After allocating that FlowFile to a bin, it checks to see if that bin is eligible to be merged.  Since NiFi executes processor by default as fast as possible (milliseconds count here), it is possible at the very moment it looks at the inbound connection not all FlowFiles from a specific directory may be present yet.  Determining if a bin should be merged is controlled by the  "Minimum Number of Entries", "Minimum Group Size", and "Max Bin Age" configuration properties.  If both min settings are satisfied a bin will be merged.  If both mins have not been met, but the max bin age (age of bin since first FlowFile was allocated to bin) has been reached or exceeded, the bin will be merged.  With the defaults you can see how it is very easy that a bin may get merged before all necessary FlowFiles have been added to it.    The "Maximum number of Bins" is also very important here.  If you have 5 bins (default) and you have 20 source dated directories.  it becomes possible that FlowFiles get allocated by that correlation attribute to 5 different bins, and then another FlowFile is processed that belong to none of those 5 bins (6th dated directory).  What happens in this scenario is that MergeContent forces the merge of the oldest bin regardless or mins or max bin age in order to free a bin to allocated the next FlowFile to.   If all your source unique dated directories contain same number of files, setting the min number entries is easy, but that is probably not the case.  So what a user would typically do is set mins num entries to a value larger then any source directory would ever have to prevent bin from merging until the max bin age is reached.  This introduces the latency in your dataflow by that configured max bin age.

Hope this helps you understand the MergeContent processor configuration options better within the context of your specific dataflow needs.

The above does not to me seem like an efficient dataflow.   ListFile is listing files from the local filesystem, the fetchFile fetches the content for those listed files and adds it to the FlowFile.  Then if you use the MergeContent to create a tar or zip,  your script is going to need to untar or unzip that bundle somewhere to process the files. So you are effectively reading content rom a directory, which means writing content to NiFi's content repository, then tar/zipping which is another write to NiFi's content repository, then your script is untar/unzip that files (another write to somewhere local) in order to process the files.

That is why in my original response i suggested avoiding the FetchFile and only use the ListFile to get the unique date source directory and pass that absolute path to your script that processes the files directly our of the source path.  Reduces latency and a lot of disk IO. 

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

7 REPLIES 7

avatar
Super Mentor

@Racketmojster 

A NiFi FlowFile consists of FlowFile Content (physical bytes of the source content) and FlowFile Attributes (Attributes about the content and FlowFile).   
You did not share how you are retrieving the parent folder:

My nifi setup is to retrieve the date folders inside the parent directory of Logfolder

I assume maybe you tried to use getFile or FetchFile?

Also not very clear on what you are trying to do from this statement:

date folders as a whole is passed to the python script

A detailed use case might help here.

NiFi works with content and directories are not content.  They are tracked in NiFi more as attributes/metadata related to the content ingested.  If you look at the FlowFile attributes on your FlowFile created for logs.txt, you should see an "absolute.path" attribute that would have the full path to the logs.txt file.

If that is all you need and you have no need to actually get the content of the logs.txt file itself in NiFi, you could use juts the ListFile processor to get only the metadata/attributes about the found file and pass the value from "absolute.path" to your executeStreamCommand script.

If each of your date folders contains multiple files, you would need to design a dataflow that accounts for that and eliminates duplicates so you only execute your ExecuteStreamCommand script.  For example use the ReplaceText processor (Always Replace strategy) to write the "absolute.path" value or date portion of the  full path to the content of the FlowFile, then use DetectDuplicate processor to purge all duplicates before your ExecuteStreamCommand processor.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

avatar
New Contributor

Hi @MattWho,
Thanks for the reply.  Apologies for not including the current setup of the workflow:

Racketmojster_0-1716297000769.png

From what you mentioned, i understand that flowfiles contain both the attributes and the content of the file itself. 
My use case is that my python script needs all the files under date folder to be processed and i do need the contents of the files as well not just the attributes. Currently the flow shown above is passing individual log.txt files one by one inside the date folder from fetchfile processor to ExecuteStreamCommand processor. 

What I actually want is to fetch all the files and folders inside the parent folder i.e.("Date folder") to be passed to ExecuteStreamCommand processor at once while maintaining its directory structure. And this is to be repeated for other date folders too. 
Currently the python script is breaking as it is not expecting a single log file but rather a date folder which contains folders of multiple zones containing the actual logs file.

So my idea is to reconstruct the directory structure after fetching the files from local storage and then merging all the flowfiles that belong to the same date, but i am unsure if this is possible and would like to know if there are other methods to perform this.

avatar
Super Mentor

@Racketmojster 

NiFi passes FlowFiles from component processor to component processor.  A directory is nothing more than metadata and not actual content in itself.  It is a path leading to hopefully data.  So there is no NiFi processor specifically for generating a FlowFile based off a discovered parent directory path.

But like I said in original my original post, a dataflow consisting of:
ListFile --> UpdateAttribute (to extract date from absolute.path FlowFile attribute to new attribute for example "date.path") --> DetectDuplicate (configured with "${date.path}" as "Cache Entry Identifier" value) --> ReplaceText (optional: only needed if content of FlowFile needs to have directory path in it to pass to your script.  Ideally this is not needed if you can just pass the FlowFile ${data.path} attribute value to your script. No idea what input is expected for your script) --> ExecuteStreamCommand 

So while the flow may list all files from the same date path, we will de duplicate so only one FlowFile from each unique date path is passed on to your executeStreamCommand processor so it can then execute your script agains that source directory.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
New Contributor

Hello @Racketmojster ,

Use a ListFile processor to fetch the list of date folders (20240515, 20240516, etc.) from your Logfolder directory. Configure the ListFile processor to recursively list files and directories.

Set this to the parent directory containing your date folders (Logfolder in your case). Optionally, you can filter by directory name pattern (e.g., use a regex to match date folder formats).

If you want to process specific files within each date folder, you can use a FetchFile processor connected to the ListFile processor. After fetching the list of directories or files, use a MergeContent processor to merge the content into a single flowfile.

Best Regards,
ky kynect

avatar
New Contributor

Hi @brandonsherrick,
Thanks for the reply.

Like you mentioned I want to process not specific files but all the files that belong to the date folder while maintaining their directory structure as it is necessary for the python script to process properly. Is there a way to use mergeContent processor to achieve the above? And also what will be configuration for the processor would be like exactly?

Below is the current setup of my apache Nifi:

Racketmojster_1-1716298003654.png

 

avatar
Super Mentor

@Racketmojster 

So you do want to ingest all the files within a specific parent directory path.  Use case details are important here.  I was not sure if perhaps your script fetched the files when passed a folder date string.

There are certainly challenges in with using MergeContent processor which may require modifications to your script to make it work.

MergeContent provides numerous Merge Formats for how you want this processor to merge the content from multiple source FlowFiles:

  1. The default is binary concatenation which just appends the bytes from one FlowFile to the end of the previous FlowFiles content.  You could specify a delimiter to keep track of filename of each section of bytes and to mark where one file's  bytes start and end.  This would require your script to parse this binary concatenated content and split the files by delimiter and obtain the individual content's filenames from the delimiter.  This is rather messy.
  2. You could use ZIP or TAR formats to merge the files in to a TAR or ZIP files that retains the directory structure and filenames.  This format would require that your script can then untar or unzip the content in order to process the individual files within the bundle.  This is a less messy option.

Once you have decided on the merge format, you need to configure the MergeContent to make sure all the files from within a unique parent directory (unique date directory) are merged with one another.  This is where the "Correlation Attribute Name" setting is used.  All source FlowFiles where the value from the configured FlowFile attribute is identical will be placed in same MergeContent bin. For example, you might use "absolute.path" in this property, but you'll need to make sure this will work in your use case since i don't know what your complete source structure tree looks like.  You could also use an UpdateAttribute processor before MergeContent to create a new FlowFIle attribute with just the extracted date string from absolute.path and then use that new FlowFile attribute as your correlation attribute setting.

Now you need to make sure MergeContent does not merge a bin before all expected FlowFiles have been added to bin.  The way MergeContent processor works is that when executed it reads FlowFile(s) from inbound connection and allocates it/them to a bin.  After allocating that FlowFile to a bin, it checks to see if that bin is eligible to be merged.  Since NiFi executes processor by default as fast as possible (milliseconds count here), it is possible at the very moment it looks at the inbound connection not all FlowFiles from a specific directory may be present yet.  Determining if a bin should be merged is controlled by the  "Minimum Number of Entries", "Minimum Group Size", and "Max Bin Age" configuration properties.  If both min settings are satisfied a bin will be merged.  If both mins have not been met, but the max bin age (age of bin since first FlowFile was allocated to bin) has been reached or exceeded, the bin will be merged.  With the defaults you can see how it is very easy that a bin may get merged before all necessary FlowFiles have been added to it.    The "Maximum number of Bins" is also very important here.  If you have 5 bins (default) and you have 20 source dated directories.  it becomes possible that FlowFiles get allocated by that correlation attribute to 5 different bins, and then another FlowFile is processed that belong to none of those 5 bins (6th dated directory).  What happens in this scenario is that MergeContent forces the merge of the oldest bin regardless or mins or max bin age in order to free a bin to allocated the next FlowFile to.   If all your source unique dated directories contain same number of files, setting the min number entries is easy, but that is probably not the case.  So what a user would typically do is set mins num entries to a value larger then any source directory would ever have to prevent bin from merging until the max bin age is reached.  This introduces the latency in your dataflow by that configured max bin age.

Hope this helps you understand the MergeContent processor configuration options better within the context of your specific dataflow needs.

The above does not to me seem like an efficient dataflow.   ListFile is listing files from the local filesystem, the fetchFile fetches the content for those listed files and adds it to the FlowFile.  Then if you use the MergeContent to create a tar or zip,  your script is going to need to untar or unzip that bundle somewhere to process the files. So you are effectively reading content rom a directory, which means writing content to NiFi's content repository, then tar/zipping which is another write to NiFi's content repository, then your script is untar/unzip that files (another write to somewhere local) in order to process the files.

That is why in my original response i suggested avoiding the FetchFile and only use the ListFile to get the unique date source directory and pass that absolute path to your script that processes the files directly our of the source path.  Reduces latency and a lot of disk IO. 

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Community Manager

@Racketmojster Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.  Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: