Created 09-26-2023 02:55 AM
Hello
I need help
I want to unzip file.zip with NiFi
I have used unpackContent processor but that didn't work because of my nifi version 1.13
I have also used executeStreamCommand to execute python script but I didn't found until now the solution
Can you help me
Thanks
Created 09-26-2023 06:04 AM
@lafi_oussama,
First of all, what is the compression format of the files you are trying to unzip?
What error did you receive when using UnpackContent?
Have you tried using CompressContent ( https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apach... )
You can configure the processed to either compress or decompress your files. You have multiple compression formats from which you can choose from.
Created 09-26-2023 06:35 AM
Created 09-26-2023 07:19 AM
@lafi_oussama
Have you tried upgrading to the latest version of Apache NiFi?
You have not shared the specific ERROR you are encountering with the UnpackContent processor.
You have not shared specific around what you tried using the ExecuteStreamCommand and Python scripts.
Any additional details you can provide about your zip files?
Thanks,
Matt
Created 09-26-2023 07:31 AM
I can't upgrade the version of NiFi.
the error I encountered with UnpackContent : Unable to unpack StandardFlowFileRecord[uuid=e94fb414-7fb3-441a-8294-89e2766633a4,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1695735090062-1030, container=default, section=6], offset=0, length=6002134],offset=0,name=outBI.zip,size=6002134] because it does not appear to have any entries; routing to failure
Example of python script that I have executed with ExecuteStreamCommand :
import zipfile
import sys
import gzip
import os
import io
zip_folder= sys.stdin.buffer.read()
with zipfile.ZipFile(io.BytesIO(zip_folder), 'r') as zip_folder:
for file_info in zip_folder.infolist():
file_name = file_info.filename
# Read the content of the file
file_content = zip_folder.read(file_name)
# Print the content to stdout (standard output)
sys.stdout.buffer.write(file_content)
sys.stdout.buffer.write(b'\n')
sys.stdout.flush()
Created 09-27-2023 06:29 AM
@lafi_oussama
Does the zipfile actually contain files or only empty directories?
Created on 10-02-2023 02:09 AM - edited 10-02-2023 02:11 AM
yes the zipfile contains mutltiple floders that contain files
Created 10-01-2023 01:29 PM
If you can run this python code under Python without any external modules, you should be able to run it as a scripted processor and have everything happen inside of NiFi.
Created on 10-02-2023 02:10 AM - edited 10-02-2023 02:11 AM
I didn't found the code that unzip the content of flowfile and send the unzipped files multiple flowfiles
Created on 10-07-2023 09:48 PM - edited 10-07-2023 09:49 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab.
"Script Body" below:
''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)