Member since
02-01-2022
281
Posts
103
Kudos Received
60
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1120 | 05-15-2025 05:45 AM | |
| 4947 | 06-12-2024 06:43 AM | |
| 7917 | 04-12-2024 06:05 AM | |
| 5814 | 12-07-2023 04:50 AM | |
| 3201 | 12-05-2023 06:22 AM |
05-13-2026
06:08 AM
2 Kudos
We are all using AI to write code, but when it comes to Apache NiFi, the current landscape often resembles the Wild West. Whether you are generating synthetic data scripts or translating complex machine learning models, Large Language Models (LLMs) are incredible accelerators. However, if you ask an AI to write a native Apache NiFi 2.0 Python processor from scratch, there is a very high probability it will confidently hand you code that instantly breaks your canvas. NiFi 2.0’s Python API is relatively new, and most AI training data is heavily saturated with legacy NiFi 1.x ExecuteScript solutions (using Jython or Groovy). Even when an AI correctly identifies the 2.0 API, it frequently misconfigures the underlying Java-to-Python bridge, resulting in “ghost” processors with dashed lines and missing routing relationships. In this post, I am going to share the exact methodology I used to leverage AI for writing custom NiFi processors safely, ensuring my dataflows operate seamlessly with my custom Python logic. The Input: Example Fraud Python ScriptPermalink This script’s logic assumes that transactions originating from two specific cities, or those exceeding $10,000, constitute fraud. Traditionally, this fraud model is intended to be deployed on Cloudera Machine Learning (CML) in a Workbench session and invoked in NiFi via the InvokeHTTP processor. I have tested this architecture, and it works flawlessly. Unfortunately, this integration is often unavailable during local Kubernetes testing (which is the focus of this post) outside of the Cloudera Public Cloud. Therefore, this script serves as a bridge to ensure the same Python responses can be tested natively, allowing downstream test data to flow in non-CML-connected environments. import cml.models_v1 as models
SUSPICIOUS_CITIES = {
"Lagos": {"lat": 6.5244, "lon": 3.3792},
"New Delhi": {"lat": 28.6139, "lon": 77.2090}
}
# 0.45 degrees (~50km) is the exact mathematical net needed to catch all of Steven's regional fraud
TOLERANCE = 0.5
# These 3 accounts have valid data that geographically overlaps with the fraud zones.
# We whitelist them from the location-based heuristic to ensure a pristine demo.
DEMO_SAFE_ACCOUNTS = []
def is_suspicious_location(lat: float, lon: float) -> str:
for city, coords in SUSPICIOUS_CITIES.items():
if (abs(lat - coords["lat"]) <= TOLERANCE) and (abs(lon - coords["lon"]) <= TOLERANCE):
return city
return None
@models.cml_model
def detect_fraud(args):
is_fraud = False
explanations = {}
# Rule 1: High Amount Threshold (>$10k is ALWAYS flagged)
if args["amount"] > 10000:
is_fraud = True
explanations["amount"] = f"Transaction amount ({args['amount']}) exceeds the 10,000 limit."
# Rule 2: Originates strictly around restricted geographies
# We skip this check if it's one of the overlapping good accounts
if args["account_id"] not in DEMO_SAFE_ACCOUNTS:
suspicious_city = is_suspicious_location(args["lat"], args["lon"])
if suspicious_city:
is_fraud = True
explanations["location"] = f"Transaction originated from a high-risk region near {suspicious_city}."
if is_fraud:
return {
"fraud_score": 0.99,
"risk_level": "HIGH",
"decision": "REVIEW",
"explanations": explanations
}
else:
return {
"fraud_score": 0.01,
"risk_level": "LOW",
"decision": "APPROVE",
"explanations": {"status": "all heuristic checks passed"}
} Rule 1: The AI Writes the Logic, You Own the FrameworkPermalink The biggest mistake you can make is copying and pasting a complete Python processor generated by an AI directly into your /extensions directory. AI models often hallucinate complex, aspirational examples that do not function as expected in your specific environment. When an AI provides malformed custom processor code, NiFi will either fail to load the processor entirely or, worse, load it but refuse to display the success and failure relationships in the UI. The Pro Move: Pin the AI within a strict, proven architectural skeleton for the NiFi wrapper. I am going to show you one right now! By “pin,” I mean I essentially had to wrestle the AI and lock it down using my first processor example. I proved to the AI that my example processor worked, and together we confirmed the baseline processor GenericTransform framework functioned correctly. Finally, we moved forward with constructing the actual custom nifi processor I needed. 💪 Rule 2: Prove the Skeleton FirstPermalink Before you introduce a single line of AI-generated business logic, deploy a bare-minimum structural template to the canvas. If the skeleton doesn’t load and route data, your complex logic will fail as well. This exercise also proves that you understand how to deliver and iterate versions of a processor for rapid testing in the NiFi UI. Here is the exact GenericTransform framework I used. It does nothing but pass data through, but it proves the custom processor can compile and expose its relationships natively. import json from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult class GenericTransformTemplate(FlowFileTransform): # Mandatory: Registers the processor with the NiFi backend class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.1-BASE' description = 'Bare-minimum framework to test NiFi UI integration.' tags = ['template', 'framework'] def __init__(self, **kwargs): # 'pass' is the safest initialization in many containerized environments pass def transform(self, context, flowfile): contents_str = flowfile.getContentsAsBytes().decode('utf-8') attributes = flowfile.getAttributes() # Route directly to success without modification return FlowFileTransformResult( relationship='success', attributes=attributes, contents=contents_str ) Test it: Drop this into your extensions folder. Wait 30 seconds. Drag it onto the canvas. Can you connect the success relationship to a LogAttribute processor? Yes? Now you are ready for the AI code. Rule 3: Inject Python Logic DefensivelyPermalink Once your skeleton is proven, prompt your AI to write strictly isolated Python changes within the confines of the processor framework. By this point, the AI should understand your exact architectural approach, making functional Python improvements relatively straightforward. When injecting new Python logic into your data pipeline, you must code defensively against edge cases: The Array Trap: AI assumes FlowFiles contain a single JSON object. If your upstream generator creates an array of transactions, the AI’s .get() dictionary methods will trigger fatal AttributeErrors. Always wrap your logic to handle both isinstance(payload, list) and single dictionaries. Never Overwrite the Payload: AI scripts often return only the result of their computation. If you replace your FlowFile content with just the ML score, you lose your original transaction_id and break downstream routing. Always append the AI’s output to the existing payload (e.g., payload["ai_response"] = result). Trap Everything: Wrap the AI logic in a try/except block that catches failures, writes the error to an attribute (attributes['python_error'] = str(e)), and safely routes the FlowFile to failure instead of crashing the processor. Anticipate Iteration: Expect to find more edge cases. Keep iterating, and you will get it to work. Rule 4: Master the Hot-Reload WorkflowPermalink The NiFi 2.0 Python API features auto-reloading. You do not need to restart your pod or execute manual scripts to test new custom NiFi Python processor logic. If you are using a local mount (e.g., minikube mount ~/nifi-custom-processors:/extensions): Save your .py file. Wait 30 to 60 seconds. The background thread will detect the file change and recompile it. The UI Catch: The NiFi web canvas aggressively caches UI elements. Refresh your browser and check the processor list for your new version tag to ensure the changes are reflected. The Example: The Output and Working Custom NiFi Processor import json from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult class FraudModel(FlowFileTransform): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.4-SNAPSHOT' description = 'Executes the CML fraud detection model natively in NiFi.' tags = ['fraud', 'detection', 'cml', 'replacement'] def __init__(self, **kwargs): pass # ========================================== # CML MODEL LOGIC # ========================================== SUSPICIOUS_CITIES = { "Lagos": {"lat": 6.5244, "lon": 3.3792}, "New Delhi": {"lat": 28.6139, "lon": 77.2090} } TOLERANCE = 0.5 DEMO_SAFE_ACCOUNTS = [] def is_suspicious_location(self, lat: float, lon: float) -> str: for city, coords in self.SUSPICIOUS_CITIES.items(): if (abs(lat - coords["lat"]) <= self.TOLERANCE) and (abs(lon - coords["lon"]) <= self.TOLERANCE): return city return None def detect_fraud(self, args: dict) -> dict: is_fraud = False explanations = {} # Rule 1: High Amount Threshold if args.get("amount", 0) > 10000: is_fraud = True explanations["amount"] = f"Transaction amount ({args.get('amount')}) exceeds the 10,000 limit." # Rule 2: Originates strictly around restricted geographies if args.get("account_id") not in self.DEMO_SAFE_ACCOUNTS: suspicious_city = self.is_suspicious_location(args.get("lat", 0.0), args.get("lon", 0.0)) if suspicious_city: is_fraud = True explanations["location"] = f"Transaction originated from a high-risk region near {suspicious_city}." if is_fraud: return { "fraud_score": 0.99, "risk_level": "HIGH", "decision": "REVIEW", "explanations": explanations } else: return { "fraud_score": 0.01, "risk_level": "LOW", "decision": "APPROVE", "explanations": {"status": "all heuristic checks passed"} } # ========================================== def transform(self, context, flowfile): contents_str = flowfile.getContentsAsBytes().decode('utf-8') attributes = flowfile.getAttributes() try: # Parse incoming JSON payload = json.loads(contents_str) # The upstream generator sometimes creates lists of transactions. # Handle both lists and single dictionaries safely. if isinstance(payload, list): for tx in payload: tx["cml_response"] = self.detect_fraud(tx) enriched_data = payload else: payload["cml_response"] = self.detect_fraud(payload) enriched_data = payload return FlowFileTransformResult( relationship='success', attributes=attributes, contents=json.dumps(enriched_data) ) except Exception as e: # If JSON parsing fails, route to failure and tag the error attributes['cml_error'] = str(e) return FlowFileTransformResult( relationship='failure', attributes=attributes, contents=contents_str ) Permalink The VerdictPermalink AI is an incredible tool for writing the heavy-lifting logic inside NiFi 2.0 Python processors, but it is a terrible architect for the processor framework itself. By treating my example NiFi API wrapper as a rigid, protected skeleton and carefully injecting Python logic inside of it, I was able to create this processor at lightning speed. How many times do you think it took me to get this Python processor code to work? The version is 4, so it took me 4 iterations from the start to finish to complete the processor in this excercise. Now fire up your cluster, open up a Python script, and see if you can transform it into a custom NiFi processor! ResourcesPermalink Custom NiFi Processors with Cloudera Streaming Operators NiFi2 Processor Playground Cloudera Streaming Operators GitHub Repo NiFi Python Developer’s Guide AppendixPermalink NiFi 2.0 Custom Python Processor with PandasPermalink This is written as a complete, copy-paste-ready sample that any engineer can drop into a new environment for immediate testing. No changes to the K8s CR, mount, or pod are required to build this new python processor in the Cloudera Streaming Operator footprint. Similar steps can be duplicated in any appropriate NiFi 2.0 context. Objective Create a new, self-contained native Python processor named PandasJSONTransformer: Accepts JSON content in a FlowFile (e.g. output from TransactionGenerator). Loads it into a Pandas DataFrame. Using lon/lat determines distance from home (defined in script). Outputs the transformed JSON on the success relationship. Input Flow File: [ {
"ts" : "2026-05-05 14:55:11",
"account_id" : "943",
"transaction_id" : "6a9b1242-4892-11f1-b035-3a8bcd2ccadb",
"amount" : 64,
"lat" : 44.3568905517,
"lon" : -0.6186160357,
"nearest_city" : "Lagos",
"nearest_country" : "Nigeria"
} ]
Step 1: Create the New Processor File Navigate to the exact directory where TransactionGenerator.py lives: cd ~/nifi-custom-processors # ← adjust only if your local path is different
Create the new file PandasJSONTransformer.py with the following code: import json
import io
import pandas as pd
import numpy as np
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
class PandasJSONTransformer(FlowFileTransform):
class Java:
# Essential: Ensures success and failure relationships appear in NiFi
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '1.0.7-FINAL'
description = 'An example processor using python pandas.'
tags = ['pandas', 'poc', 'geospatial']
dependencies = ['pandas', 'numpy'] # NiFi auto-installs these
def __init__(self, **kwargs):
# 'pass' is the safest initialization for this environment
pass
def transform(self, context, flowfile):
content_bytes = flowfile.getContentsAsBytes()
attributes = flowfile.getAttributes()
# Merritt Island, FL Coordinates
HOME_LAT, HOME_LON = 28.3181, -80.6660
try:
# Step 1: Handle the "Array Trap"
# Even for single records, we wrap in a list so Pandas creates a proper DataFrame row
raw_data = json.loads(content_bytes.decode('utf-8'))
if not isinstance(raw_data, list):
raw_data = [raw_data]
df = pd.DataFrame(raw_data)
# Step 2: Proof of Concept Math
if 'lat' in df.columns and 'lon' in df.columns:
df['lat'] = pd.to_numeric(df['lat'], errors='coerce')
df['lon'] = pd.to_numeric(df['lon'], errors='coerce')
# Calculate Euclidean distance from Merritt Island:
# dist = sqrt((lat1 - lat2)^2 + (lon1 - lon2)^2)
df['dist_from_home'] = np.sqrt(
(df['lat'] - HOME_LAT)**2 + (df['lon'] - HOME_LON)**2
)
# Add a simple flag to show Pandas touched the data
df['pandas_processed'] = True
# Step 3: Output Generation
output_json = df.to_json(orient='records', indent=None)
return FlowFileTransformResult(
relationship='success',
contents=output_json.encode('utf-8'),
attributes={
**attributes,
'pandas.transformed': 'true',
'pandas.version': pd.__version__
}
)
except Exception as e:
# Rule 3: Defensive failure routing
return FlowFileTransformResult(
relationship='failure',
contents=content_bytes,
attributes={**attributes, 'pandas.error': str(e)}
)
Step 2: Deploy & Activate Ensure the minikube mount is still running: minikube mount ~/nifi-custom-processors:/extensions --uid 10001 --gid 10001 NiFi 2.0 will automatically detect new/updated .py files in the extensions directory (usually within 10–30 seconds). When testing python changes, increment the version in the code (1.0.1) and re-save the file after each code change — this forces a clean reload. If you are impatient like me you may be refreshing the page to notice new processors. Step 3: Verification in NiFi UI Open NiFi canvas. Drag a new processor and search for PandasJSONTransformer. The new processor should appear with the exact description and version from the code. Simple test flow: TransactionGenerator → PandasJSONTransformer. Flow Definition File. Run the flow. Check output flowfile for the new columns dist_from_home and pandas_processed. Be patient on first processor attempt after dragging it to the canvas. The processor will indicate dependencies are downloading when it is first introduced to the canvas. The processor must complete this dependency state before allowing you to route Success/Failure. Step 4: Hand-Off Framework for Any Other Environment To replicate this exact processor in a different NiFi 2.0 environment: Place PandasJSONTransformer.py in the Python extensions path. Complete the Deployment Steps 1–3 above. Verify pandas are installed by NiFi. Confirm flowfile output is as expected. Output Flow File: [ {
"ts" : "2026-05-05 15:10:13",
"account_id" : "487",
"transaction_id" : "xxx84324584-4894-11f1-b035-3a8bcd2ccadb",
"amount" : 39,
"lat" : 48.4010217027,
"lon" : 4.7099962916,
"dist_from_home" : 87.7062397261,
"pandas_processed" : true
} ]
Troubleshooting # Check NiFi pod logs for processor loading
kubectl logs -n cld-streaming mynifi-0 | grep -i pandas
# check pod for python extensions
kubectl exec -n cfm-streaming mynifi-0 -- ls -la /opt/nifi/nifi-current/python/extensions
... View more
04-14-2026
09:24 AM
If you are following along with Cloudera Streaming Operators I have just posted this Developer Blog here on the Cloudera community: https://community.cloudera.com/t5/Developer-Blogs/Building-a-Fully-Local-RAG-with-Cloudera-Streaming-Operators/ba-p/413825
... View more
04-14-2026
09:20 AM
I added another blog post showing how to make custom processors with Cloudera Streaming Operators: https://cldr-steven-matison.github.io/blog/Custom-Processors-With-Cloudera-Streaming-Operators/ And the github: https://github.com/cldr-steven-matison/NiFi2-Processor-Playground
... View more
04-13-2026
08:25 AM
2 Kudos
In this guide, I show you how I build a fully local RAG (Retrieval-Augmented Generation) system running locally on minikube with Cloudera Streaming Operators. I will show how to orchestrate Kafka, NiFi, and Qdrant to power semantic search and Q&A over documents. The purpose is a foundation to develop use cases around RAG that will actually work on my local hardware. Let’s build: StreamToVLLM — a local RAG setup that turns your Cloudera Operator deployed cluster into a real-time, streaming-aware knowledge base. No cloud APIs. No data leaving your machine. Just pure Cloudera Streaming Operators (Kafka + NiFi) + vLLM inference + Qdrant vector search. Perfect for this GPU (RTX 4060 8 GB VRAM) setup — it comfortably runs Qwen2.5-3B-Instruct while NiFi ingests documents in real time via Kafka. We already have the Cloudera Streaming Operators stack, GPU-Accelerated Kubernetes: Setting up NVIDIA on Minikube, and some example Deploying vLLM with Qwen Llama on Minikube working from previous sessions — now let’s wire it all together into a complete local RAG pipeline. PrerequisitesPermalink You should have: Minikube running with GPU passthrough (RTX 4060 confirmed) Cloudera Streaming Operators (CSM + CSA + CFM) installed in namespaces cld-streaming and cfm-streaming NiFi UI accessible at https://mynifi-web.mynifi.cfm-streaming.svc.cluster.local/nifi/ Quick GPU double-check (do this first!): kubectl get nodes -o jsonpath='{.items[*].status.allocatable.nvidia\.com/gpu}'
# Should return: 1
# Run the official NVIDIA test pod
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: gpu-test
spec:
restartPolicy: Never
containers:
- name: cuda-test
image: nvcr.io/nvidia/k8s/cuda-sample:vectoradd-cuda12.5.0
resources:
limits:
nvidia.com/gpu: 1EOF
kubectl logs gpu-test -f
Expected output: Test PASSED Notice the response: [Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done Pro Tip! Keep watch nvidia-smi running in another terminal — you’ll see your 4060 light up during inference Step 1: Deploy vLLM Qwen Inference ServerPermalink We use the Qwen model now but you could use Llama here too. Save as vllm-qwen.yaml: apiVersion: apps/v1
kind: Deployment
metadata:
name: vllm-server
spec:
replicas: 1
selector:
matchLabels:
app: vllm-server
template:
metadata:
labels:
app: vllm-server
spec:
containers:
- name: vllm-server
image: vllm/vllm-openai:latest
env:
- name: HF_TOKEN
valueFrom:
secretKeyRef:
name: hf-token
key: HF_TOKEN
resources:
limits:
nvidia.com/gpu: 1
args:
- "Qwen/Qwen2.5-3B-Instruct"
- "--quantization"
- "bitsandbytes"
- "--load-format"
- "bitsandbytes"
- "--gpu-memory-utilization"
- "0.80" # Fits comfortably in 6.92 GiB
- "--max-model-len"
- "2048" # 2k is a solid sweet spot for 3B models
- "--enforce-eager"
volumeMounts:
- name: shm
mountPath: /dev/shm
volumes:
- name: shm
emptyDir:
medium: Memory
sizeLimit: "2Gi"
---
apiVersion: v1
kind: Service
metadata:
name: vllm-service
namespace: default
spec:
selector:
app: vllm-server
ports:
- protocol: TCP
port: 8000
targetPort: 8000
type: ClusterIP
Apply the Qwen YAML: kubectl apply -f vllm-qwen.yaml
kubectl get pods -w
kubectl port-forward svc/vllm-service 8000:8000 Test with curl: curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen2.5-3B-Instruct",
"messages": [{"role": "user", "content": "Hello! Tell me a short joke."}],
"temperature": 0.7
}'
Notice the response: {"id":"chatcmpl-bf9eab9a4c952c2d","object":"chat.completion","created":1774452848,"model":"Qwen/Qwen2.5-3B-Instruct","choices":[{"index":0,"message":{"role":"assistant","content":"Of course! Here's a quick one for you:\n\nWhy don't scientists trust atoms?\n\nBecause they make up everything!","refusal":null,"annotations":null,"audio":null,"function_call":null,"tool_calls":[],"reasoning":null},"logprobs":null,"finish_reason":"stop","stop_reason":null,"token_ids":null}],"service_tier":null,"system_fingerprint":null,"usage":{"prompt_tokens":37,"total_tokens":62,"completion_tokens":25,"prompt_tokens_details":null},"prompt_logprobs":null,"prompt_token_ids":null,"kv_transfer_params":null} Warning! If your curl command crashes the port forward, your vllm-server is not ready yet. Watch the vllm-server logs until you see Application startup complete. Ready for RAG! Your GPU pod is now the brain of the system. Step 2: Deploy Qdrant Vector DBPermalink Save as qdrant-deployment.yaml: apiVersion: apps/v1
kind: Deployment
metadata:
name: qdrant
spec:
replicas: 1
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
- containerPort: 6334
volumeMounts:
- name: qdrant-data
mountPath: /qdrant/storage
volumes:
- name: qdrant-data
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: qdrant
spec:
selector:
app: qdrant
ports:
- name: http
port: 6333
targetPort: 6333
- name: grpc
port: 6334
targetPort: 6334
type: ClusterIP
Apply the Qdrant YAML: kubectl apply -f qdrant-deployment.yaml
kubectl port-forward svc/qdrant 6333:6333 Let’s use curl to test and to create our first sample collection to use later: curl -X PUT "http://localhost:6333/collections/my-rag-collection" \
-H "Content-Type: application/json" \
-d '{"vectors": {"size": 768, "distance": "Cosine"}}'
Notice the response: {"result":true,"status":"ok","time":0.098496962} Pro Tip! With port-forward on visit http://localhost:6333/dashboard and have a look at Qdrant. Step 3: Lightweight Embedding ServerPermalink Save as embedding-server.yaml: apiVersion: apps/v1
kind: Deployment
metadata:
name: embedding-server
spec:
replicas: 1
selector:
matchLabels:
app: embedding-server
template:
metadata:
labels:
app: embedding-server
spec:
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: tei-container
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.5
# This is the magic part: it forces the token into the binary's face
command: ["/bin/sh", "-c"]
args:
- |
text-embeddings-router \
--model-id nomic-ai/nomic-embed-text-v1 \
--port 80 \
--hf-api-token "[hf-token]"
ports:
- containerPort: 80
resources:
limits:
memory: "2Gi"
cpu: "2"
volumeMounts:
- name: model-cache
mountPath: /data
volumes:
- name: model-cache
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: embedding-server-service
spec:
selector:
app: embedding-server
ports:
- protocol: TCP
port: 80
targetPort: 80
type: ClusterIP
I had some difficulties in getting the hf model and token secret into the mix during pod creation. The working setup I ended up with was to download the model locally, mount that during pod creation, and explicitly use the hf-token string. Supporting Commands: mkdir -p /mnt/c/hf-models/nomic-embed
cd /mnt/c/hf-models/nomic-embedpython3 -c "from huggingface_hub import snapshot_download; snapshot_download(repo_id='nomic-ai/nomic-embed-text-v1', local_dir='/mnt/c/hf-models/nomic-embed', token='[hf-token]')"minikube mount /mnt/c/hf-models/nomic-embed:/mnt/c/hf-models/nomic-embed Apply the Embedding Server YAML: kubectl apply -f embedding-server.yaml
kubectl port-forward svc/embedding-server-service 8080:80 Test with curl: curl -X POST http://localhost:8080/embed -d '{"inputs":"The streaming pipeline is finally complete."}' -H 'Content-Type: application/json'
Notice the response: [[0.04619594,-0.0090487795, ..]] Step 4: Document Ingestion with NiFiPermalink If vLLM is the brain, Apache NiFi is the nervous system. We need to get data from our sources, publish that data to Kafka, then Consume it, chunk it, turn it into vectors, and store it in Qdrant — all within NiFi on kubernetes. To make this easy, I’ve exported the complete NiFi flow as a JSON file: StreamToVLLM.json. You can download it and import it directly into your NiFi UI by dragging a new Process Group onto the canvas and uploading the flow definition file. NiFi Flow Definition File The fully operational flows are here: NiFi Templates. StreamTovLLM NiFi FlowPermalink The flow processes each document through a “Retrieve-then-Generate” loop: ConsumeKafka_2_6: Ingests raw text from the new_documents topic using the #{Kafka Broker Endpoint} parameter. SplitText: Chunks the incoming data into 20-line segments to ensure the context remains efficient for the 3B model. ExtractText: Place original content into ${content} attribute for replaceText (Qdrant) ReplaceText (Format for Embedding): Wraps the text chunk into the JSON format required by the embedding server: {"inputs": "$1"}. InvokeHTTP (Embed): Calls the embedding-service to generate a 768-dimension vector. EvaluateJsonPath: Extracts the resulting vector from the JSON response into a FlowFile attribute named vector_data. ReplaceText (Qdrant): format the body required for Qdrant Upsert. InvokeHTTP (Qdrant Upsert): The flow upserts the original chunk and its embedding into Qdrant so the system “learns” the document for future queries. IngestToStream NiFi FlowPermalink This flow is used to route incoming data sources to our StreamTovLLM kafka topic new_documents. GenerateFlowFile: Sends a simple description of StreamToVLLM to Kafka. InvokeHttp: Sends the entire markdown file for RAG with Cloudera Streaming Operators (this) blog post. PublishKafka_2_6: Publishes our raw content to the new_documents topic. Careful! The scheduling for both top processors is 1 day. Do not keep the ingest running. You only need to ingest one example or the other. If you do ingest too much, delete the collection, and remake it to start over. Start the StreamTovLLM Flow. Next, send 1 flowfile (Run Once) to PublishKafka in IngestToStream — the document will now stream through NiFi and land in Qdrant and able to be used as context in calls to our vllm service! Step 5: Query Time — Ask Questions!Permalink What better way to test, than to ask our model What is StreamTovLLM? Remember the first inquiry to the model is that it does not know anything about StreamTovLLM as expected: === ANSWER ===
I apologize, but there seems to be an error in the term you've provided ("streamtovllm"). It appears to be a misspelling or incorrect combination of words. The correct term might be Streamlit (a popular open-source platform for building user interfaces for machine learning applications) or llama (which could refer to Llama.cpp, a C++ library for machine learning). Could you please clarify or provide more context about what "streamtovllm" is supposed to be?
Now after we have executed our NiFi Flow when we test with curl: curl -X POST "http://localhost:6333/collections/my-rag-collection/points/scroll" -H "Content-Type: application/json" -d '{"limit": 1, "with_payload": true}'
Notice the response: {"result":{"points":[{"id":"ee6a5070-0add-4e43-b218-9d64eeab3053","payload":{"text":"StreamToVLLM is a specialized data engineering framework. nIt connects Apache NiFi to vLLM inference servers. nThe system uses Qdrant as a vector database to store technical blog content. nThis allows for local RAG (Retrieval-Augmented Generation) on Windows WSL2 machines. nThe main goal of the project is to demonstrate high-performance streaming AI nusing Cloudera Streaming Operators and dedicated GPU hardware.","source":"kafka-stream","timestamp":"Wed Mar 25 15:24:32 GMT 2026"}}],"next_page_offset":null},"status":"ok","time":0.000567914} Now, lets go a bit further and build a simple python script to ask the same question. Save as query-rag.py: import requests
def ask(question😞
# 1. Embed emb = requests.post("http://localhost:8080/embed", json={"inputs": question}).json()[0]
# 2. Search Qdrant search = requests.post("http://localhost:6333/collections/my-rag-collection/points/search",
json={"vector": emb, "limit": 1, "with_payload": True}).json()
results = search.get("result", [])
raw_text = results[0]["payload"].get("text", "") if results else "No context."
# --- THE ESCAPE HATCH --- # If the text starts with '[', it's a vector. We don't want it. if str(raw_text).startswith("["😞
print("[!] Warning: Found vector trash in text field. Ignoring it.")
context = "Reference data is currently being re-indexed."
else:
# Force the context to be tiny (500 chars) to guarantee we stay under 4096 tokens context = str(raw_text)[:500]
# 3. vLLM Call payload = {
"model": "Qwen/Qwen2.5-3B-Instruct",
"messages": [
{"role": "system", "content": "Briefly answer using this context."},
{"role": "user", "content": f"Context: {context}\n\nQuestion: {question}"}
],
"max_tokens": 100
}
resp = requests.post("http://localhost:8000/v1/chat/completions", json=payload)
if resp.status_code == 200:
print("\n=== ANSWER ===")
print(resp.json()["choices"][0]["message"]["content"])
else:
print(f"Error: {resp.text}")
ask("What is StreamToVLLM?")
Notice the well informed response: python3 query-rag.py
=== ANSWER ===
StreamToVLLM is a specialized data engineering framework that connects Apache NiFi with vLLM inference servers, enabling local retrieval and augmentation generation on Windows WSL2 machines using Qdrant for storing technical blog content. Its primary goal is to showcase high-performance streaming AI using Cloudera Streaming Operators and dedicated GPU hardware. Boom — instant, context-aware answers from your live streaming documents. Terminal Commands For This SessionPermalink # Look at logs
kubectl logs gpu-test -f
kubectl logs -l app=vllm-server --tail 300
# Chained port forward
kubectl port-forward svc/qdrant 6333:6333 &
kubectl port-forward svc/vllm-service 8000:8000 &
kubectl port-forward svc/embedding-server-service 8080:80 &
# Delete the collection
curl -X DELETE "http://localhost:6333/collections/my-rag-collection"
# Recreate it fresh
curl -X PUT "http://localhost:6333/collections/my-rag-collection" \
-H "Content-Type: application/json" \
-d '{"vectors": {"size": 768, "distance": "Cosine"}}'
# Delete YAMLS
kubectl delete -f vllm-qwen.yaml
kubectl delete -f qdrant-deployment.yaml
kubectl delete -f embedding-server.yaml YAMLS! You can find all the yaml for working with Cloudera Streaming Operators in my GitHub Repo. The “StreamToVLLM” Takeaway Permalink This project isn’t about building a final product; it’s about establishing a foundational dev pipeline that actually works on local hardware. By wiring these components together on kubernetes, we’ve brought a local GPU for functional development with Cloudera Streaming Operators. Foundational RAG Plumbing: We’ve built the “boring” but essential infrastructure—moving data from a stream, through a vectorizer, and into a searchable brain—all within a single Minikube cluster. The Power of Context: By chunking and embedding our own data, we’ve moved the LLM from “guessing” to “referencing.” We aren’t just asking Qwen to be smart; we’re giving it an open-book exam using the specific context we’ve provided. Unlocking the Sandbox: The real win here isn’t the chatbot—it’s the capability. Now that the pipeline exists, you can swap models, change embedding strategies, or pipe in entirely different data sources (Git, Jira, Slack) to see how they interact with local inference. Zero-Cost Iteration: Because this is 100% local, we can break things, re-index the collection, and run 1,000 test queries in iterations. Resources & Further ReadingPermalink Qwen2.5-3B-Instruct (Hugging Face) nomic-embed-text-v1 (Hugging Face) Text Embeddings Inference (TEI) Qdrant Vector Database Qdrant GitHub Repository Cloudera Streaming Operators GPU-Accelerated Kubernetes: Setting up NVIDIA on Minikube Deploying vLLM with Qwen Llama on Minikube RAG with Cloudera Streaming OperatorsPermalink Please reach out to schedule a discussion if you would like a deeper dive, hands on experience, or demos of the integration between these components.
... View more
03-31-2026
04:49 AM
I created a repo of the assets used in the blog above. You can check it out here: https://github.com/cldr-steven-matison/ClouderaStreamingOperators
... View more
03-10-2026
10:12 PM
1 Kudo
Setting up a new MacBook for Data Engineering is quite a chore. In this guide, we will take you from a fresh macOS installation to running the full suite of Cloudera Streaming Operators (CFM, CSA, CSM) on a local Minikube cluster.
Please note that we are utilizing the Evaluation examples for these services. This is a critical step for any engineer; mastering the foundational configurations is essential before moving into complex, secure, and high-availability production architectures.
Let’s get started!
My MacBook Setup
First, we need the “Swiss Army Knife” of macOS: homebrew. We will use it to install our core CLI tools, container runtimes, and local Kubernetes environment.
# Install Homebrew (if not already installed)
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# Install Core Tools
brew install git docker python minikube kubernetes-cli k9s helm
# Start Docker Desktop (or Colima)
open /Applications/Docker.app
# Spin up Minikube with enough resources for the full stack
minikube start --cpus 4 --memory 12288
Pro Tip! Ensure you have sufficient resources in Docker, and then use' minikube start' to allocate CPU and memory as shown above.
Some Helm and Kubectl Setup
Let’s get started with kubectl by creating a namespace and a Docker secret we will use with each operator. We will log in now and pull these Cloudera Streaming Operators from the Cloudera Helm Registry during installs later.
# 1. Create the namespace
kubectl create namespace cld-streaming
# 2. Add Cloudera License Credentials
kubectl create secret docker-registry cloudera-creds \
--docker-server=container.repository.cloudera.com \
--docker-username=<CLOUDERA_LICENSE_USER> \
--docker-password=<CLOUDERA_LICENSE_PASSWORD> \
-n cld-streaming
helm repo add jetstack https://charts.jetstack.io
helm install cert-manager jetstack/cert-manager --version v1.16.3 --namespace cert-manager --create-namespace --set installCRDs=true
helm registry login container.repository.cloudera.com
Pro Tip! If you are working with helm over time across different clusters, remember you may need to re-login again.
Install Cloudera Streams Messaging Operator(CSM 1.6)
Next, we will stand up our messaging backbone: Kafka. The CSM 1.6 operator makes it incredibly simple to deploy a Kafka cluster for local development. First, let’s install the Strimzi Kafka Operator from Cloudera:
# Install Cloudera Kafka Strimzi Operator
helm install strimzi-cluster-operator --namespace cld-streaming --set 'image.imagePullSecrets[0].name=cloudera-creds' --set-file clouderaLicense.fileContent=./license.txt --set watchAnyNamespace=true oci://container.repository.cloudera.com/cloudera-helm/csm-operator/strimzi-kafka-operator --version 1.6.0-b99
Create the kafka-eval.yaml as follows:
kafka-eval.yaml
apiVersion: kafka.strimzi.io/v1
kind: Kafka
metadata:
name: my-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 4.1.1.1.6
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
entityOperator:
topicOperator: {}
userOperator: {}
Create the kafka-nodepool.yaml as follows:
kafka-nodepool.yaml
apiVersion: kafka.strimzi.io/v1
kind: KafkaNodePool
metadata:
name: combined
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- controller
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
kraftMetadata: shared
deleteClaim: false
---
apiVersion: kafka.strimzi.io/v1
kind: KafkaNodePool
metadata:
name: broker-only
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
kraftMetadata: shared
deleteClaim: false
Execute the following kubectl apply command to create the Kafka cluster:
kubectl apply --filename kafka-eval.yaml,kafka-nodepool.yaml --namespace cld-streaming
Notice the response:
kafka.kafka.strimzi.io/my-cluster created
kafkanodepool.kafka.strimzi.io/combined created
kafkanodepool.kafka.strimzi.io/broker-only created
Deploy Schema Registry (CSM 1.6)
To manage our data serialization and ensure compatibility across the stack, we deploy the Cloudera Schema Registry. This allows us to define a schema for data that NiFI, Kafka, Flink, and SSB can easily decipher.
Create the sr-values.yaml file as follows:
sr-values.yaml
tls:
enabled: false
authentication:
oauth:
enabled: false
authorization:
simple:
enabled: false
database:
type: in-memory
service:
type: NodePort
Execute the following helm command to install Schema Registry:
# Install Schema Registry from the Cloudera OCI Registry
helm install schema-registry \
--namespace cld-streaming \
--version 1.6.0-b99 \
--values sr-values.yaml \
--set "image.imagePullSecrets[0].name=cloudera-creds" \
oci://container.repository.cloudera.com/cloudera-helm/csm-operator/schema-registry
Notice the following output:
Pulled: container.repository.cloudera.com/cloudera-helm/csm-operator/schema-registry:1.6.0-b99
Digest: sha256:6e7d57bf42ddefc3f216b977d91fd606caf78a456c6aa2989002ad4387fb5a6d
NAME: schema-registry
LAST DEPLOYED: Mon Mar 9 13:15:09 2026
NAMESPACE: cld-streaming
STATUS: deployed
REVISION: 1
DESCRIPTION: Install complete
TEST SUITE: None
NOTES:
Thank you for installing schema-registry.
Your release is named schema-registry.
WARNING: You configured Schema Registry to use an in-memory database. This setup is only suitable for testing and evaluation. Cloudera does not recommend this configuration for production use. All schemas will be lost when Pods restart.
Deploy Kafka Surveyor (CSM 1.6)
A new standout in the CSM Operator 1.6 is Kafka Surveyor. This provides advanced observability into your Kafka environment. It is a game-changer for monitoring broker health and topic metrics in a Kubernetes-native way.
Before we can create a surveyor yaml we need to find our kafka bootstrapServers:
kubectl get kafka my-cluster -n cld-streaming -o jsonpath='{.status.listeners[?(@.name=="plain")].bootstrapServers}'
Grab the following from the output:
my-cluster-kafka-bootstrap.cld-streaming.svc:9092
and use it to create the surveyor-eval.yaml as follows:
kafka-surveyor.yaml
clusterConfigs:
clusters:
- clusterName: my-cluster
tags:
- csm1.6
bootstrapServers: my-cluster-kafka-bootstrap.cld-streaming.svc:9092
adminOperationTimeout: PT1M
authorization:
enabled: false
commonClientConfig:
security.protocol: PLAINTEXT
surveyorConfig:
surveyor:
authentication:
enabled: false
tlsConfigs:
enabled: false
Execute the following helm command to install Cloudera Surveyor:
helm install cloudera-surveyor \
--namespace cld-streaming \
--version 1.6.0-b99 \
--values kafka-surveyor.yaml \
--set image.imagePullSecrets=cloudera-creds \
--set-file clouderaLicense.fileContent=./license.txt
Notice the following output:
Pulled: container.repository.cloudera.com/cloudera-helm/csm-operator/surveyor:1.6.0-b99
Digest: sha256:7e12c2b2ab8aca0351296f5c986d64d55a92b66e7ac6b599ccbd5b3fa9c13167
NAME: cloudera-surveyor
LAST DEPLOYED: Mon Mar 9 13:18:14 2026
NAMESPACE: cld-streaming
STATUS: deployed
REVISION: 1
DESCRIPTION: Install complete
TEST SUITE: None
NOTES:
Thank you for installing surveyor.
Your release is named cloudera-surveyor.
Deploy SQL Stream Builder (CSA 1.5)
To get started with SQL Stream Builder first we need to install the Cloudera Streaming Analytics Operator:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
kubectl wait -n cert-manager --for=condition=Available deployment --all
helm install csa-operator --namespace cld-streaming \
--version 1.5.0-b275 \
--set 'flink-kubernetes-operator.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.sse.image.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.sqlRunner.image.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.mve.image.imagePullSecrets[0].name=cloudera-creds' \
--set-file flink-kubernetes-operator.clouderaLicense.fileContent=./license.txt \
oci://container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator
Notice the following output:
Pulled: container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator:1.5.0-b275
Digest: sha256:2ee057584ec167a9d70cabbe9a4f0aa8cd93ddf74e73da3c2617a829a87e593c
NAME: csa-operator
LAST DEPLOYED: Mon Mar 9 14:43:32 2026
NAMESPACE: cld-streaming
STATUS: deployed
REVISION: 1
DESCRIPTION: Install complete
TEST SUITE: None
Deploy NiFi (CFM 3.0)
Next, we will deploy NiFi using the standard evaluation spec. This gives us a fully functional NiFi instance without the overhead of complex external persistence.
First, install the CFM 3.0 Nifi Operator:
helm install cfm-operator oci://container.repository.cloudera.com/cloudera-helm/cfm-operator/cfm-operator \
--namespace cld-streaming \
--version 3.0.0-b126 \
--set installCRDs=true \
--set image.repository=container.repository.cloudera.com/cloudera/cfm-operator \
--set image.tag=3.0.0-b126 \
--set "image.imagePullSecrets[0].name=cloudera-creds" \
--set "imagePullSecrets={cloudera-creds}" \
--set "authProxy.image.repository=container.repository.cloudera.com/cloudera_thirdparty/hardened/kube-rbac-proxy" \
--set "authProxy.image.tag=0.19.0-r3-202503182126" \
--set licenseSecret=cfm-operator-license \
--set-file clouderaLicense.fileContent=./license.txt
Create the nifi-eval.yaml as follows:
nifi-eval.yaml
apiVersion: cfm.cloudera.com/v1alpha1
kind: Nifi
metadata:
name: mynifi
namespace: cld-streaming
spec:
replicas: 1
nifiVersion: "1.28.1"
image:
repository: container.repository.cloudera.com/cloudera/cfm-nifi-k8s
tag: 3.0.0-b126-nifi_1.28.1.2.3.17.0-9
pullSecret: cloudera-creds
tiniImage:
repository: container.repository.cloudera.com/cloudera/cfm-tini
tag: 3.0.0-b126
pullSecret: cloudera-creds
hostName: mynifi.localhost
uiConnection:
type: Ingress
configOverride:
nifiProperties:
upsert:
nifi.cluster.leader.election.implementation: "KubernetesLeaderElectionManager"
authorizers: |
<authorizers>
<authorizer>
<identifier>single-user-authorizer</identifier>
<class>org.apache.nifi.authorization.single.user.SingleUserAuthorizer</class>
</authorizer>
</authorizers>
loginIdentityProviders: |
<loginIdentityProviders>
<provider>
<identifier>single-user-provider</identifier>
<class>org.apache.nifi.authentication.single.user.SingleUserLoginIdentityProvider</class>
<property name="Username">admin</property>
<property name="Password">$2b$10$GRa8g9Z5rBENXPFNHFBosev9XmY6CSk0SdcBi5sQMRX92KD73asGG</property>
</provider>
</loginIdentityProviders>
stateManagement:
clusterProvider:
id: kubernetes-provider
class: org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider
Execute the following kubectl apply command to create the nifi cluster:
kubectl apply -f nifi-eval.yaml -n cld-streaming
Final Verification
The best way to see the fruits of your labor is via k9s .
# Check the deployment progress
k9s
Be Patient! It will take quite a bit of time for the entire stack to be online and running.
Behold: The full Cloudera Streaming stack running harmoniously in your local minikube cluster.
You can also expose all the services with minikube like this:
minikube service list -n cld-streaming
┌───────────────┬─────────────────────────────────────────────────┬──────────────────┬─────┐
│ NAMESPACE │ NAME │ TARGET PORT │ URL │
├───────────────┼─────────────────────────────────────────────────┼──────────────────┼─────┤
│ cld-streaming │ cfm-operator-controller-manager-metrics-service │ No node port │ │
│ cld-streaming │ cfm-operator-webhook-service │ No node port │ │
│ cld-streaming │ cloudera-surveyor-service │ http/8080 │ │
│ cld-streaming │ flink-operator-webhook-service │ No node port │ │
│ cld-streaming │ my-cluster-kafka-bootstrap │ No node port │ │
│ cld-streaming │ my-cluster-kafka-brokers │ No node port │ │
│ cld-streaming │ mynifi │ No node port │ │
│ cld-streaming │ mynifi-web │ No node port │ │
│ cld-streaming │ schema-registry-service │ application/9090 │ │
│ cld-streaming │ ssb-mve │ No node port │ │
│ cld-streaming │ ssb-postgresql │ No node port │ │
│ cld-streaming │ ssb-sse │ No node port │ │
└───────────────┴─────────────────────────────────────────────────┴──────────────────┴─────┘
Pro Tip! Once your pods are running, use minikube service to expose the UI endpoints.
# Check out the service UIs:
minikube service cloudera-surveyor-service --namespace cld-streaming
minikube service schema-registry-service --namespace cld-streaming
minikube service ssb-sse --namespace cld-streaming
minikube service mynifi --namespace cld-streaming
This setup is your “sandbox” for building end-to-end streaming architectures with the Cloudera Streaming Operators.
ResourcesPermalink
Cloudera Streams Messaging (CSM) 1.6 Docs
Cloudera Streaming Analytics 1.5 Docs
Cloudera Flow Management (CFM) 3.0 Docs
... View more
06-04-2025
06:27 AM
Excellent article @zzeng 👍 !!
... View more
05-15-2025
05:45 AM
1 Kudo
@brajs Yes, it is possible to make custom flow analysis rules. This is still tech preview in nifi 2.0 so documentation is limited. I would recommend to take a look at some existing rules, tear them down to source code, modify to suit, and build and package your custom rules. Once you have a new rules nar file, just deliver it to the nifi /lib location and it should be available to use.
... View more
02-20-2025
06:31 AM
I just know from many years of setting up ranger within ambari with usersync. I doubt current CDP docs will strictly call it out.
... View more
02-20-2025
06:00 AM
@drewski7 Yes, UserSync is Required. Please see docs: https://docs.cloudera.com/cdp-private-cloud-base/7.3.1/security-ranger-user-management/topics/security-ranger-configure-adv-usersync-freeipa.html You can take the top arrow on the version to make sure its your correct version.
... View more