Member since
11-16-2015
894
Posts
657
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
7570 | 02-22-2024 12:38 PM | |
1632 | 02-02-2023 07:07 AM | |
3529 | 12-07-2021 09:19 AM | |
4545 | 03-20-2020 12:34 PM | |
15486 | 01-27-2020 07:57 AM |
04-29-2025
12:37 PM
3 Kudos
With the addition of Python to Apache NiFi 2.0 and Cloudera Flow Management 4, developers now have a new way to rapidly develop processors and can leverage Python libraries for FlowFile processing, which is an exciting new feature. But what about NiFi’s scripting capabilities using Jython? In this post I will explore the history of Jython and Python in the Apache NiFi project, their capabilities, and the current state of Python and Jython in both Apache NiFi 2 and Cloudera Flow Management 4. NiFi’s Scripting Capabilities: Apache NiFi 0.5.0 introduced the ExecuteScript processor, allowing developers to interact with the NiFi SDK using several JVM-based scripting languages such as Groovy, Javascript, Lua, Clojure, and Jython, a JVM-based version of Python. The Jython language is compatible with Python, so any script you write in pure Python should work in ExecuteScript using Jython. “Pure” Python means the entire code is written in the Python language rather than other languages that Python has bindings for such as C. This distinction is important because many popular Python libraries such as pandas and scikit include components written in C and are NOT pure Python. This means that even though you can write a Python program importing the pandas library (for example) that program/script will not run in Jython. This limitation significantly narrows the scope of what you can accomplish with Jython in ExecuteScript. A workaround for this is to use the ExecuteStreamCommand processor to run a Python script file using the actual Python interpreter. However, this method comes with several important constraints. The script can only receive FlowFile content via standard input (stdin) and must return FlowFile content using standard out (stdout). You have to bring your own Python interpreter. Access to the rest of the NiFi processor API, including FlowFile attributes, creating multiple FlowFiles, and multiple commits to the session are not available via ExecuteStreamCommand, which can severely limit the operations you can perform on your FlowFiles. Because of these limitations, ExecuteStreamCommand should be your last resort for executing Python programs, used only when there is no other way to achieve the goal with Jython or native Python processors in Cloudera Flow Management 4. Jython support in scripting components was deprecated in Apache NiFi 1.x and fully removed in NiFi 2.0. However, Cloudera Flow Management 4 has retained Jython support, ensuring continued compatibility for existing flows. This means if you have Jython scripts in the scripting components in NiFi 1, you will not be able to migrate to Apache NiFi 2.x. This capability has only been restored in Cloudera Flow Management 4. Python in Apache NiFi 2.0: The generally available (GA) release of Apache NiFi 2.0 in November 2024 introduced a number of new features not present in the NiFi 1.x line. Perhaps the most notable feature is the addition of the Python Processor Software Development Kit (SDK). The Python SDK in NiFi allows developers to write processors in Python and leverage CPython libraries such as pandas and scikit. Given Python’s popularity for Artificial Intelligence (AI) applications, the Python SDK in NiFi enables users to create processors that can use AI libraries to do data processing. NiFi 2.0 comes with a handful of processors written in Python, including ones for data parsing and VectorDB integrations with systems like Chroma, Pinecone, and Qdrant, commonly used for Retrieval Augmented Generation (RAG) use cases involving document processing, storing, and semantic search. Cloudera Flow Management 4 adds many additional Python processors that are not available in open-source Apache NiFi 2.0, including: Chunk/EmbedData: Allows the user to chunk their unstructured data then embed the data into vectors PromptBedrock: Provides integration with Amazon Bedrock LexicalQuery/InsertTo/VectorQueryMilvus: Provides integration to/from the Milvus vector database PromptChatGPT: Allows the user to send prompts to ChatGPT and process the results PartitionText/Csv/Docx/Pdf/Html: Offers a wider range of structured and unstructured document formats that can be partitioned for later chunking and embedding, increasing the quality and relevance of prompt responses These processors rely on CPython libraries and cannot be implemented in Jython. Jython and Python in Cloudera Flow Management 4: Because Jython is based on Python 2.x and is no longer actively maintained, the Apache NiFi community removed the Jython library from the NiFi ARchive (NAR) containing the scripting components. The upcoming GA release of Cloudera Flow Management 4 adds Jython back into the scripting NAR. This allows Cloudera customers with existing Jython scripts to continue running these scripts without modification. This is just one example how Cloudera supports backward compatibility for NiFi flows. Many components removed from NiFi 2.0 are retained in Cloudera Flow Management 4 such as Hive, HBase, Atlas, Ranger, Couchbase 2, Cassandra 3, Kafka 2.6, and Kudu. This ensures that Cloudera customers who rely on these features can upgrade without forcing unnecessary changes to their flows. The NiFi Python SDK: Although the introduction of the Python SDK enables developers to implement processors in Python, the SDK was designed for simplicity and does not allow access to the full NiFi Processor APIs. There are limitations that must be considered when designing a processor: Python processors do not have access to the ProcessSession, so they cannot create multiple FlowFiles, commit/rollback multiple session commits, merge FlowFiles, etc. Only three types of processors are currently supported in the Python SDK: FlowFileSource, FlowFileTransform, and RecordTransform. A FlowFileSource processor does not allow incoming connections and expects a single FlowFile as output. This can be used as a source processor to generate sample data for example. A FlowFileTransform processor receives a single incoming FlowFile and expects a single FlowFile as output. This would be used when modifying the entire content of a FlowFile such as converting binary image data from JPG to PNG. A RecordTransform processor also receives a single incoming FlowFile and expects a single FlowFile as output, but it can process individual records in the incoming data via a configured RecordReader controller service. This would be used to transform each record individually such as changing the value of a field in each record. These limitations will affect users in several ways. First, only NiFi Processors can be scripted in Python, not Controller Services or Reporting Tasks. Next, a Python processor can work with at most one FlowFile per execution. This limits what a processor can do in terms of creating/cloning multiple FlowFiles, ingesting a batch of FlowFiles, and handling lifecycle events such as onPropertyModified(). When to use Jython or Python? Choosing between Jython (via the scripted components such as ExecuteScript) or the Python SDK for processors in Cloudera Flow Management 4 depends on your use case and requirements. Here is a quick guide to help with decision making: Use Jython when Use Python when You already have existing Jython scripts in your flows You want to import a CPython library such as pandas You want to implement scripted controller services such as ScriptedRecordReader You are more familiar with Python than other supported scripting languages You need control over multiple session transactions (i.e. commits/rollbacks) and/or Flowfiles You only need to work with one FlowFile at a time Jython and Python capabilities in Cloudera Flow Management offer several ways to use the Python programming language and its libraries to rapidly develop NiFi components. Leveraging Jython in scripted components offer a rich set of features for those developers more comfortable with the Python language, and using Python libraries to develop NiFi processors enables more integrations with external systems such as GenAI applications. The combination of these in Cloudera Flow Management presents a powerful environment for which to enhance and enrich your flows to achieve a higher level of business success. Learn More: To explore the new capabilities of Cloudera Flow Management 4 and discover how it can transform your data pipelines with native Python support, watch our The Five Things You Need To Know about Unlocking The Power of NiFi 2.0 webinar . For a full list of connectors in Cloudera Flow Management, visit the Cloudera Connectors Library.
... View more
Labels:
04-07-2025
03:03 AM
hello Sir, I have just setup a basic NiFi custom processor. when i try to build, one of the module gets built but the nar module fails with error [[1;31mERROR[m] [1;31mRule 3: org.apache.maven.enforcer.rules.dependency.RequireReleaseDeps failed with message:[m [[1;31mERROR[m] [1;31mDependencies outside of Apache NiFi must not use SNAPSHOT versions[m [[1;31mERROR[m] [1;31mcom.example:nifi-my_custom_nifi_processor-nar:nar:1.0-SNAPSHOT[m [[1;31mERROR[m] [1;31m com.example:nifi-my_custom_nifi_processor-processors:jar:1.0-SNAPSHOT <--- is not a release dependency
... View more
03-24-2025
09:53 PM
Thank you, @MattWho , for the additional information. I really appreciate you providing timely and clear solutions. You’re awesome!
... View more
02-03-2025
02:39 AM
Thanks mburgess! This was helpful.
... View more
10-04-2024
04:54 AM
@mjmoore
Have you been able to resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
... View more
09-28-2024
07:03 PM
1 Kudo
Thanks to the second OP for identifying the root cause in the NiFi Jira. For people researching this today, the cause was the implicit/default Namespace specified in the root node (the 'xmlns' referenced in that element but without a suffix). In the case of the second poster, their XML started with: <data xmlns="http://www.media-saturn.com/msx" xmlns: ... The `/data/item//uniqueID` he was searching for belongs to, more accurately, the "http://www.media-saturn.com/msx" namespace, meaning that - he was supposed to - specify that namespace as part of his XPath expression. The reason that searching for the pathless "//@uniqueType" worked, was because that search searches all namespaces for that XPath expression! I'm using NiFi 2.0.0 M4 today and I'm pleased to report that it appears to support the XPath 3.0/3.1 notation where the Namespace can be specified inline with the query. It's not particularly elegant - but it works. You prefix the Namespace with the capital Letter 'Q' and wrap it in curly brackets; namely: Q{http://www.media-saturn.com/msx}<single-level selector> To implement his expression, "/data/item//uniqueID[UniqueType='ProdID']/text()" which currently returns an Empty String set for Key 'ProdID4', you would use: /Q{http://www.media-saturn.com/msx}data/Q{http://www.media-saturn.com/msx}item//uniqueID[UniqueType='ProdID']/text() I have a suspicion that the second Namespace reference (to 'item' in this case) is not required, since once you've selected/are navigating down the 'data' path of the correct Namespace, you're not likely to jump to another Namespace? My research indicates that Attributes do not seem to accept Namespace referencing - but again, once you've successfully selected your path I suspect it becomes a moot point. Aside, [1] it would be nice if the NiFi documentation specified the version of the XPath implemented within the Processor. [2] Even better if there were a drop down within the Processor that allowed a developer to select the version of XPath expression desired.
... View more
08-13-2024
09:28 PM
@mburgess -- I am reading from Oracle table, not sure about the incoming flowfile
... View more
07-31-2024
03:23 AM
1 Kudo
@Adyant001, Welcome to the Cloudera Community. As this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.
... View more
04-26-2024
02:01 AM
1 Kudo
Hi @mburgess , tried increasing the Entity time tracking window to 8 hours (the time difference is 7hrs here from PDT to UTC), but still could see the same files getting listed, not the latest ones from source server as expected. Attached the Nifi ListSFTP configuration i'm using.
... View more
03-20-2024
08:17 AM
Hi there @mburgess thank you for your reply. Let play around with your solution and see if i can get it working and write the output to a flowfile 🤓
... View more