Support Questions

Find answers, ask questions, and share your expertise

Nifi Unzip files

avatar

Hello 

I need help

I want to unzip file.zip with NiFi

I have used unpackContent processor but that didn't work because of my nifi version 1.13

I have also used executeStreamCommand to execute python script but I didn't found until now the solution

Can you help me

 

Thanks

9 REPLIES 9

avatar

@lafi_oussama,

First of all, what is the compression format of the files you are trying to unzip?
What error did you receive when using UnpackContent?

Have you tried using CompressContent ( https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apach... )
You can configure the processed to either compress or decompress your files. You have multiple compression formats from which you can choose from.

avatar

@cotopaul

The compression format is .zip.

The CompressContent doesn't work with zip format

 

avatar
Master Mentor

@lafi_oussama 

Have you tried upgrading to the latest version of Apache NiFi?
You have not shared the specific ERROR you are encountering with the UnpackContent processor.
You have not shared specific around what you tried using the ExecuteStreamCommand and Python scripts. 
Any additional details you can provide about your zip files?

Thanks,
Matt

avatar

I can't upgrade the version of NiFi.

the error I encountered with UnpackContent : Unable to unpack StandardFlowFileRecord[uuid=e94fb414-7fb3-441a-8294-89e2766633a4,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1695735090062-1030, container=default, section=6], offset=0, length=6002134],offset=0,name=outBI.zip,size=6002134] because it does not appear to have any entries; routing to failure

 Example of python script that I have executed with ExecuteStreamCommand :

import zipfile
import sys
import gzip
import os
import io

zip_folder= sys.stdin.buffer.read()

with zipfile.ZipFile(io.BytesIO(zip_folder), 'r') as zip_folder:
for file_info in zip_folder.infolist():
file_name = file_info.filename
# Read the content of the file
file_content = zip_folder.read(file_name)
# Print the content to stdout (standard output)
sys.stdout.buffer.write(file_content)
sys.stdout.buffer.write(b'\n')
sys.stdout.flush()

avatar
Master Mentor

@lafi_oussama 

Does the zipfile actually contain files or only empty directories?

avatar

@MattWho 

yes the zipfile contains mutltiple floders that contain files

avatar
Super Collaborator

If you can run this python code under Python without any external modules, you should be able to run it as a scripted processor and have everything happen inside of NiFi. 

avatar

@joseomjr 

I didn't found the code that unzip the content of flowfile and send the unzipped files multiple flowfiles

avatar
Super Collaborator

Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab.

"Script Body" below:

 

 

''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO

class PyInputStreamCallback(InputStreamCallback):
    ''' InputStream Callback '''
    def __init__(self):
        self.zip_file = None

    def process(self, input_stream):
        ''' Process our InputStream '''
        zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
        self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
    ''' OutputStream Callback '''
    def __init__(self, file):
        self.file = file

    def process(self, output_stream):
        ''' Process our OutputStream '''
        output_stream.write(self.file.read())

flow_file = session.get()
if flow_file:
    input_stream_callback = PyInputStreamCallback()
    output_stream_callback = PyOutputStreamCallback
    session.read(flow_file, input_stream_callback)
    zip_filename = flow_file.getAttribute("filename")
    zip_file = input_stream_callback.zip_file
    for name in (name for name in zip_file.namelist() if not name.endswith("/")):
        new_flow_file = session.create()
        new_flow_file = session.putAttribute(new_flow_file, "filename", name)
        new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
        new_flow_file = session.write(
                                        new_flow_file,
                                        output_stream_callback(zip_file.open(name))
                                    )
        session.transfer(new_flow_file, REL_SUCCESS)
    zip_file.close()
    session.remove(flow_file)