Support Questions

Find answers, ask questions, and share your expertise

NiFi 2.0.0 - Custom Python Processor Uploading Issue

avatar
Contributor

All,

 

I created a new processor using python3.12 in NiFi 2.0.0 M4 release of NiFi. 

 

My directory structure looks like this based on the documentation - 

drewski7_0-1728323942308.png

I don't include anything under bundled dependencies and my MANIFEST.MF looks like this.

Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: processors-nar
Nar-Group: processors
Nar-Version: 0.0.2


I zipped everything and created a .nar called TransformOpenskyStates.nar. Once created, I added it to my NiFi_home/extensions directory.

After looking in the logs, I see this - 

2024-10-07 13:44:34,481 INFO [NAR Auto-Loader] org.apache.nifi.nar.NarAutoLoaderTask Found ./extensions/TransformOpenskyStates.nar in auto-load directory
2024-10-07 13:44:39,487 INFO [NAR Auto-Loader] org.apache.nifi.nar.StandardNarLoader Starting load process for 1 NARs...
2024-10-07 13:44:39,494 INFO [NAR Auto-Loader] org.apache.nifi.nar.StandardNarLoader Creating class loaders for 1 NARs...
2024-10-07 13:44:39,496 INFO [NAR Auto-Loader] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: /Users/drewnicolette/Downloads/nifi-2.0.0-M4/./work/nar/extensions/TransformOpenskyStates.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[./work/nar/extensions/TransformOpenskyStates.nar-unpacked]
2024-10-07 13:44:39,496 INFO [NAR Auto-Loader] org.apache.nifi.nar.StandardNarLoader Successfully created class loaders for 1 NARs, 0 were skipped
2024-10-07 13:44:39,498 INFO [NAR Auto-Loader] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for processors:processors-nar:0.0.2 in 2 millis
2024-10-07 13:44:39,499 INFO [NAR Auto-Loader] org.apache.nifi.nar.StandardNarLoader Finished NAR loading process!

But I can't find my processor in the UI/Canvas at all. 

 

Does anyone know the issue? 

1 ACCEPTED SOLUTION

avatar
Super Guru

Hi @drewski7 ,

I tried the code you have posted and it worked for me !

Here are the steps I followed:

1- Create Main Folder called TransformOpenskyStates-nar

2- Created  TransformOpenskyStates.py with the code you posted  under the main folder above.

3- Created the folder structure as follows:

SAMSAL_0-1728948714734.png

 

3 - Under the META-INF I have created the MANIFEST.FM and add the following text:

Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: TransformOpenskyStates-nar
Nar-Group: nifi.py.processors
Nar-Version: 0.0.2

4- Under NAR-INF I have created an Empty folder of "bundled-dependencies" since you dont seem to have any external dependencies.

5-  I have downloaded and installed 7-zip then I went inside the main directory created at step 1 , selected all (2 folders and 1 py file) , right click , select 7-Zip menu item and then select Add to Archive

SAMSAL_2-1728950178329.png

6- In the 7-Zip Add To Archive window,  type the name of your package and save as .zip. No need to change any configuration. If you create a zip file on the main folder level then it will add the main folder to the package and that might cause problem as nifi expects the py file to be on the root level and that could be your probelm.

SAMSAL_3-1728950343034.png

7- rename the .zip to .nar and then I would try first to place it under the lib folder and if it works you can move it to others.

SAMSAL_5-1728950499154.png

 

8 - Restart Nifi and in my case the log file had the following entries regarding this processor:

2024-10-14 20:10:00,495 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: \nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked]
...
2024-10-14 20:10:02,036 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for nifi.py.processors:TransformOpenskyStates-nar:0.0.2 in 3 millis
...
2024-10-14 20:10:14,316 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Discovered Python Processor TransformOpenskyStates

9- Once UI is up and running , I m able to see and select the processor:

SAMSAL_6-1728951208155.png

When I ran it however I got an error but that is a different story 🙂

SAMSAL_7-1728951313278.png

 

Error Message:

20:14:13 EDT
ERROR
8d85ee2d-0192-1000-7594-fe7475f52c1d
PythonProcessor[type=TransformOpenskyStates, id=8d85ee2d-0192-1000-7594-fe7475f52c1d] Failed to transform FlowFile[filename=e8370eae-8bdd-4867-91dc-1416fd5d4727]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "F:\nifi-2.0.0-M4-dev\python\framework\py4j\java_gateway.py", line 2466, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\nifi-2.0.0-M4-dev\python\api\nifiapi\flowfiletransform.py", line 33, in transformFlowFile
    return self.transform(self.process_context, flowfile)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked\TransformOpenskyStates.py", line 49, in transform
    contents = json.loads(flow_file.getContentsAsBytes())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Im emailing you a copy of the zip file . Try to change it to nar and see if that works. Im suspecting that the way you package (zip) the folder structure is causing the issue.

 

Hope that helps.

 

 

 

View solution in original post

4 REPLIES 4

avatar
Contributor

@SAMSAL  - Can you help out with this?

avatar
Super Guru

Hi @drewski7 ,

I never tried this before but I decided to give a shot. What I did is I to take a Python Extension Sample like ChunkDocument and add it as regular python extension just to get all dependencies downloaded. After that I created the folder structure similar to what you have:

my-nar.nar
+-- META-INF/
    +-- MANIFEST.MF
+-- NAR-INF/
    +-- bundled-dependencies/
        +-- dependency1
        +-- dependency2
        +-- etc.
+-- MyProcessor.py

Then I followed the following steps:

  1. I copied all the downloaded dependencies to NAR-INF/bundled-dependencies
  2. I created a MANIFEST.MF and copied the same info you have provided under META-INF/
  3. I copied the ChunkDocument.py and then rename it to MFChunkDocument.py (I will explain later why I renamed to MFCh...)  to be on the root level similar to where you have  your custom TransformOpenskyStates.py.
  4. I Zipped the file and called it to "nifi-ChunkDocument-nar.zip"
  5. I rename from .zip to  .nar and copied to  NIFI-HOME/lib .  I think you copied it under NIFI-HOME/extensions which should work as well.
  6. I deleted the created python extension for the same processor under work/python/extension to download all dependencies earlier.
  7. When I launched Nifi Im able to find the processor and set it up accordingly.

The reason I renamed the file from ChunkDocument to MFChunkDocument because there seem to be a bug. Please refer to my comment here.

I dont see anything wrong with what you did. However you mentioned your did not include  anything under  bundled-dependencies ....is this because you dont have any? if you do then you should copy all these dependencies. If that is not the issue , I would look into the processor itself and just for sake of testing add as python extension (not as NAR) to see if it works, if it doesnt then there is something wrong with the processor itself. If you want you can email a copy of your py file and I can try it from my end.

 

Hope that helps. If it does, please accept solution.

Thanks

 

 

 

 

avatar
Contributor

Hi SAMSAL

 

Thanks for the recommendation. I tried just doing the python extension and it worked. However, when I compile the nar again it still doesn't show up. 

Can you try it with this .py file I am pasting below?

Thanks!

Filename - TransformOpenskyStates.py

from nifiapi.flowfiletransform import (
    FlowFileTransform,
    FlowFileTransformResult
)
from nifiapi.properties import ProcessContext
import json

FIELD_MAP = [
    "icao24", "callsign", "origin_country", "time_position", "last_contact",
    "longitude", "latitude", "baro_altitude", "on_ground", "velocity",
    "true_track", "vertical_rate", "sensors", "geo_altitude", "squawk",
    "spi", "position_source"
]

RETURN_SCHEMA = [
    "icao24", "callsign", "origin_country",
    "reporting_time", "time_position", "last_contact",
    "longitude", "latitude", "on_ground"
]


class TransformOpenskyStates(FlowFileTransform):

    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']

    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'
        description = '''
        Transform the data returned by the OpenSky Network API.
        '''
        tags = ["opensky", "transform", "tutorial"]
        dependencies = []

    def __init__(self, **kwargs):
        super().__init__()

    def transform(
        self, context: ProcessContext, flow_file
    ) -> FlowFileTransformResult:
        '''
        Parameters:
            context (ProcessContext)
            flow_file

        Returns:
            FlowFileTransformResult
        '''
        contents = json.loads(flow_file.getContentsAsBytes())

        def sanitize_value(value):
            if isinstance(value, str):
                return value.strip()

            return value

        states = []
        for record in contents["states"]:
            record = dict(zip(FIELD_MAP, record))
            record["reporting_time"] = contents["time"]

            # Choose only fields listed in the RETURN_SCHEMA
            sanitized = {}
            for key, value in record.items():
                if key not in RETURN_SCHEMA:
                    continue

                sanitized[key] = sanitize_value(value)

            states.append(sanitized)

        return FlowFileTransformResult(
            "success",
            contents=json.dumps(states)
        )

 

avatar
Super Guru

Hi @drewski7 ,

I tried the code you have posted and it worked for me !

Here are the steps I followed:

1- Create Main Folder called TransformOpenskyStates-nar

2- Created  TransformOpenskyStates.py with the code you posted  under the main folder above.

3- Created the folder structure as follows:

SAMSAL_0-1728948714734.png

 

3 - Under the META-INF I have created the MANIFEST.FM and add the following text:

Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: TransformOpenskyStates-nar
Nar-Group: nifi.py.processors
Nar-Version: 0.0.2

4- Under NAR-INF I have created an Empty folder of "bundled-dependencies" since you dont seem to have any external dependencies.

5-  I have downloaded and installed 7-zip then I went inside the main directory created at step 1 , selected all (2 folders and 1 py file) , right click , select 7-Zip menu item and then select Add to Archive

SAMSAL_2-1728950178329.png

6- In the 7-Zip Add To Archive window,  type the name of your package and save as .zip. No need to change any configuration. If you create a zip file on the main folder level then it will add the main folder to the package and that might cause problem as nifi expects the py file to be on the root level and that could be your probelm.

SAMSAL_3-1728950343034.png

7- rename the .zip to .nar and then I would try first to place it under the lib folder and if it works you can move it to others.

SAMSAL_5-1728950499154.png

 

8 - Restart Nifi and in my case the log file had the following entries regarding this processor:

2024-10-14 20:10:00,495 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: \nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked]
...
2024-10-14 20:10:02,036 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for nifi.py.processors:TransformOpenskyStates-nar:0.0.2 in 3 millis
...
2024-10-14 20:10:14,316 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Discovered Python Processor TransformOpenskyStates

9- Once UI is up and running , I m able to see and select the processor:

SAMSAL_6-1728951208155.png

When I ran it however I got an error but that is a different story 🙂

SAMSAL_7-1728951313278.png

 

Error Message:

20:14:13 EDT
ERROR
8d85ee2d-0192-1000-7594-fe7475f52c1d
PythonProcessor[type=TransformOpenskyStates, id=8d85ee2d-0192-1000-7594-fe7475f52c1d] Failed to transform FlowFile[filename=e8370eae-8bdd-4867-91dc-1416fd5d4727]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "F:\nifi-2.0.0-M4-dev\python\framework\py4j\java_gateway.py", line 2466, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\nifi-2.0.0-M4-dev\python\api\nifiapi\flowfiletransform.py", line 33, in transformFlowFile
    return self.transform(self.process_context, flowfile)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked\TransformOpenskyStates.py", line 49, in transform
    contents = json.loads(flow_file.getContentsAsBytes())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python\311\Lib\json\decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Im emailing you a copy of the zip file . Try to change it to nar and see if that works. Im suspecting that the way you package (zip) the folder structure is causing the issue.

 

Hope that helps.