Member since
02-01-2022
288
Posts
103
Kudos Received
60
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1256 | 05-15-2025 05:45 AM | |
| 5276 | 06-12-2024 06:43 AM | |
| 8292 | 04-12-2024 06:05 AM | |
| 6104 | 12-07-2023 04:50 AM | |
| 3405 | 12-05-2023 06:22 AM |
06-18-2026
07:07 AM
@red888 How are you deploying nifi on k8s now? I am using our operators and deploying nifi w/ ssl is super easy!!
... View more
06-11-2026
03:56 AM
Excellent article! I thought the MCP was a great thing but being able to take it out and use nifi api direct is choice! :teacup_without_handle: Once some of these new ideas are in the new normal nifi and ai will just get easier and easier!
... View more
06-09-2026
08:37 AM
If you are running NiFi Kafka or Flink based applications and workloads in kubernetes, you know that visibility is everything. You can build the most complex data pipelines in the world, but without eyes on your throughput, queues, or other streaming metrics, you’re essentially flying blind. Welcome to the ultimate page for Kubernetes native observability. In this series, we walk through the exact steps to wire up the entire Cloudera Streaming Operator architecture—NiFi, Kafka, and Flink—into a unified Prometheus and Grafana stack. By the end of this journey, you won’t just have basic health checks; you will have a single pane of glass correlating NiFi’s data flow metrics perfectly with Kafka’s topic throughput and Flink’s stream processing metrics. PrerequisitesPermalink This lesson assumes you have already: Completed deployment of Cloudera Streaming Operators Have the minikube branch of Streams Processing Hands on Lab setup completed, nifi flow is running, topics txn,tnx2, and txn_fraud exist, Sql Stream Builder Jobs running with operational polling. Cloned the latest Cloudera Streaming Operators GitHub repo in ~/ local path. Warning! Some of the excercises include new helm install commands. Be prepared to use your helm uninstall commands as needed. Install/Uninstall is good practice to reset your stage. However if you are using AI to execute against this plan, you can helm upgrade or kubectl apply patches to get desired outcome(s). Prometheus InstallPermalink Before diving into the specific operators, you need to install the central monitoring stack. We will be using the community Prometheus Operator. Ensure your Kubernetes environment is ready, and run the following commands to install the Prometheus Operator and Grafana into the cld-streaming namespace. 1. Add the Helm Repo: helm repo add prometheus-community https://prometheus-community.github.io/helm-charts 2. Install the Kube-Prometheus-Stack: This specific configuration enables proxy access, sets up the default datasources, and configures the Operator to watch for PodMonitors and ServiceMonitors across all namespaces ({ }). helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace cld-streaming --create-namespace \
--set grafana.sidecar.datasources.defaultDatasourceEnabled=false \
--set 'grafana.additionalDataSources[0].name=Prometheus' \
--set 'grafana.additionalDataSources[0].type=prometheus' \
--set 'grafana.additionalDataSources[0].url=http://prometheus-kube-prometheus-prometheus.cld-streaming.svc.cluster.local:9090' \
--set 'grafana.additionalDataSources[0].access=proxy' \
--set 'grafana.additionalDataSources[0].isDefault=true' \
--set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false \
--set prometheus.prometheusSpec.podMonitorSelectorNilUsesHelmValues=false \
--set-json 'prometheus.prometheusSpec.serviceMonitorNamespaceSelector={}' \
--set-json 'prometheus.prometheusSpec.podMonitorNamespaceSelector={}'
Exposing the Prometheus and Grafana UIsPermalink Grab the URLs and keep the tunnels alive in separate terminals. Tab 1: Prometheus UI minikube service prometheus-kube-prometheus-prometheus -n cld-streaming --url
Tab 2: Grafana UI minikube service prometheus-grafana -n cld-streaming --url
You can use this command to get the admin password: kubectl get secret --namespace cld-streaming prometheus-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo
The Cloudera Streaming Operators Integration Series Monitoring Cloudera Streams Messaging (CSM) with Prometheus Monitoring Cloudera Flow Management (CFM) with Prometheus Monitoring Cloudera Streaming Analytics (CSA) with Prometheus 1. Monitoring Cloudera Streams Messaging (CSM) with Prometheus Apache Kafka is the undeniable backbone of modern real-time data, but monitoring its internal health on Kubernetes can often feel like trying to pick a lock. While the Strimzi-powered Cloudera Streams Messaging (CSM) Operator effortlessly spins up your brokers, the critical metrics you need to keep things running smoothly—like byte throughput and under-replicated partitions—are trapped deep inside the JVM. Because Prometheus doesn’t natively speak JMX, we can’t just open a port and call it a day. In Part 1 of this series, we are going to crack open that black box around Kafka. We will walk step-by-step through injecting a custom JMX Prometheus Exporter into your CSM cluster and deploying a specialized PodMonitor to translate those buried JVM metrics into crystal-clear results in Prometheus and Grafana. The Metrics ConfigMapPermalink First, we need to define how Kafka’s JMX metrics are converted into Prometheus format. Create kafka-metrics-config.yaml: kind: ConfigMap
apiVersion: v1
metadata:
name: kafka-metrics
labels:
app: strimzi
data:
kafka-metrics-config.yaml: |
# See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
lowercaseOutputName: true
rules:
# Special cases and very specific rules
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_tls_info
type: GAUGE
labels:
cipher: "$2"
protocol: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_software
type: GAUGE
labels:
clientSoftwareName: "$2"
clientSoftwareVersion: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total):"
name: kafka_server_$1_$4
type: COUNTER
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total)
name: kafka_server_$1_$4
type: COUNTER
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
# Some percent metrics use MeanRate attribute
# Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
name: kafka_$1_$2_$3_percent
type: GAUGE
# Generic gauges for percents
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
labels:
"$4": "$5"
# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"
# KRaft overall related metrics
# distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
- pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
name: kafka_server_raftmetrics_$1
type: COUNTER
- pattern: "kafka.server<type=raft-metrics><>(current-state): (.+)"
name: kafka_server_raftmetrics_$1
value: 1
type: UNTYPED
labels:
$1: "$2"
- pattern: "kafka.server<type=raft-metrics><>(.+):"
name: kafka_server_raftmetrics_$1
type: GAUGE
# KRaft "low level" channels related metrics
# distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
- pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
name: kafka_server_raftchannelmetrics_$1
type: COUNTER
- pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
name: kafka_server_raftchannelmetrics_$1
type: GAUGE
# Broker metrics related to fetching metadata topic records in KRaft mode
- pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
name: kafka_server_brokermetadatametrics_$1
type: GAUGE
Apply the yaml: kubectl apply -f kafka-metrics-config.yaml -n cld-streaming
The Kafka Cluster ConfigPermalink Create the kafka-nodepool.yaml: apiVersion: kafka.strimzi.io/v1
kind: KafkaNodePool
metadata:
name: combined
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- controller
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
kraftMetadata: shared
deleteClaim: false
Apply the yaml: kubectl apply -f kafka-nodepool.yaml -n cld-streaming`
Create the kafka-eval-prometheus.yaml: apiVersion: kafka.strimzi.io/v1
kind: Kafka
metadata:
name: my-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 4.1.1.1.6
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yaml
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
entityOperator:
topicOperator: {}
userOperator: {}
Apply the yaml: kubectl apply -f kafka-eval-prometheus.yaml -n cld-streaming
Discovery with PodMonitorPermalink Now we tell Prometheus to go find our brokers. Create our PodMoinitor strimzi-pod-monitor.yaml: apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: strimzi-pod-monitor
namespace: cld-streaming
labels:
release: prometheus
spec:
selector:
matchLabels:
strimzi.io/cluster: my-cluster
strimzi.io/kind: Kafka
namespaceSelector:
matchNames:
- cld-streaming
podMetricsEndpoints:
- path: /metrics
targetPort: 9404
interval: 30s
relabelings:
# Map Strimzi pod labels (strimzi.io/...) to top-level metric labels the dashboard expects
- action: labelmap
regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
replacement: $1
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
replacement: $1
# Standard K8s labels the dashboard variables use
- sourceLabels: [__meta_kubernetes_namespace]
targetLabel: namespace
- sourceLabels: [__meta_kubernetes_pod_name]
targetLabel: kubernetes_pod_name
- sourceLabels: [__meta_kubernetes_pod_name]
targetLabel: pod_name
- sourceLabels: [__meta_kubernetes_pod_node_name]
targetLabel: node_name
Apply the yaml: kubectl apply -f strimzi-pod-monitor.yaml -n cld-streaming
Querying Kafka Metrics in Prometheus UIPermalink You should you have the Prometheus UI exposed via minikube service and your strimzi-pod-monitor shows 3/3 targets UP. Verification: Go to Status -> Targets. Look for strimzi-pod-monitor. It should be UP. Now you can start exploring live metrics from your CSM Operator Kafka cluster in real time. The JMX Prometheus Exporter is successfully scraping your brokers on port 9404. Your Kafka brokers are named my-cluster-combined-* due to the combined KafkaNodePool. In the Prometheus UI switch to the Graph tab, and paste in the queries below. Sample Query 1: Topic Messages In Per Second (Confirmed Throughput) sum(rate(kafka_server_brokertopicmetrics_messagesin_total{topic=~"txn1|txn2|txn_fraud"}[5m])) by (pod, topic)
This query aggregates messages ingested per second, grouped by broker pod and topic. Watch it spike when your producers or NiFi flows push data into the txn topics. Excellent for spotting sudden drops or imbalances across brokers. Sample Query 2: Topic Bytes In Per Second (Throughput in Bytes) sum(rate(kafka_server_brokertopicmetrics_bytesin_total[5m])) by (topic)
This query shows the incoming byte rate per topic over a 5-minute window. It gives you a clear picture of actual data volume flowing into txn1, txn2, and especially txn_fraud. Because it uses rate(), the graph is much smoother and more useful for monitoring real-world throughput. Quick Tips for This Setup Filter by your actual broker pods when needed: sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m])) by (topic, pod)
Add namespace filtering for cleaner results: sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m])) by (topic)
If any query returns no data, make sure you are actively producing messages to the topics. Then restart Prometheus to force a fresh scrape: kubectl rollout restart statefulset prometheus-prometheus-kube-prometheus-prometheus -n cld-streaming Run these sample queries while your NiFi flow is actively sending data to txn1, txn2, and txn_fraud. You should now see clear, live throughput numbers appearing in the Prometheus graphs. This gives you immediate visibility into both message rate and data volume — perfect for evaluating how well your CSM Opeator deployed Kafka cluster is handling the workload. Visualizing CSM Kafka with Grafana DashboardsPermalink With Prometheus feeding live data, Grafana turns those raw metrics into professional dashboards. However, “no data” is a common issue at this stage — usually because Prometheus is not yet scraping the Kafka brokers or the dashboard variables don’t match your labels. Open Grafana (minikube service grafana -n cld-streaming). Login with admin and the password from the secret (see Section 4). Verify the Prometheus Data Source Go to Configuration → Data Sources. The “Prometheus” source should point to something like http://prometheus-operated.monitoring.svc:9090. Click Save & Test. It must say “Data source is working”. (Note: The “Test” button is at the bottom of the datasource edit page.) Import the Cloudera CSM Kafka Dashboard Download the JSON: curl -O https://raw.githubusercontent.com/cldr-steven-matison/ClouderaStreamingOperators/refs/heads/main/csm-kafka-dashboard.json In Grafana → Dashboards → New → Import Click Upload JSON file and select the downloaded file. On the next screen: Datasource → select your Prometheus data source Click Import Boom. You now have the new Cloudera CSM Kafka Dashboard in Grafana: SummaryPermalink With the JMX exporter successfully injected and the PodMonitor active, you have cleared the first major hurdle in building an end-to-end observability pipeline. We didn’t just flip a switch; we architected a robust, Kubernetes-native discovery mechanism that respects the Strimzi-based Operator’s strict validation rules while still providing deep, granular visibility into broker performance. By bridging the gap between Kafka’s internal JMX metrics and Prometheus, you now have the observability needed to monitor everything from message rates to partition health. Whether you are troubleshooting high CPU usage on a specific broker or watching for under-replicated partitions during a scaling event, you now have the raw data required to maintain a healthy cluster. This setup serves as the foundation for the rest of Cloudera Streaming Operator stack. Now that your event backbone (Kafka) is visible, you are ready to plug in your ingestion (NiFi) and processing (Flink) engines to achieve that elusive “single pane of glass” view across the entire data lifecycle in kubernetes. Permalink 2. Monitoring Cloudera Flow Management (CFM) with Prometheus In the previous guide on monitoring Cloudera Streams Messaging (CSM) we added visibility into your Kafka cluster. Data pipelines don’t start at the broker—they often start with NiFi. When running NiFi via the Cloudera Flow Management (CFM) Operator, securing the cluster with Single User Auth puts the APIs into a strict lockdown. This makes scraping native metrics a bit of a kubernetes challenge. In this post, we’re going to wire up a secure CFM NiFi 2.x cluster to the Prometheus + Grafana stack, bypassing web authentication safely using mTLS, and ultimately bridging our cross-namespace metrics into a single pane of glass. The NiFi Cluster Config (The CR)Permalink In NiFi 2.x, Prometheus metrics are built natively into the application; we don’t need an external JMX exporter like Kafka. However, we do need to tell the CFM Operator to disable standard authentication on the metrics endpoint. Update your Nifi Custom Resource (nifi-cluster.yaml) with the configOverride block: apiVersion: [cfm.cloudera.com/v1alpha1](https://cfm.cloudera.com/v1alpha1)
kind: Nifi
metadata:
name: mynifi
namespace: cfm-streaming
spec:
replicas: 1
nifiVersion: "2.6.0"
security:
initialAdminIdentity: "admin"
nodeCertGen:
issuerRef:
name: cfm-operator-ca-issuer-signed
kind: ClusterIssuer
singleUserAuth:
enabled: true
credentialsSecretName: "nifi-admin-creds"
configOverride:
nifiProperties:
upsert:
nifi.cluster.leader.election.implementation: "KubernetesLeaderElectionManager"
# Disable standard auth for the prometheus endpoint
nifi.web.prometheus.metrics.authenticated: "false"
Apply this configuration and allow the NiFi pods to perform a rolling restart if necessary. The mTLS VIP Bypass (Finding the Cert)Permalink Because we have singleUserAuth: enabled, NiFi will fiercely defend its endpoints—even with the property override above—throwing 401 Unauthorized errors at Prometheus. NiFi expects a login token. To get around the web login completely, we use Client Certificates (mTLS). The CFM Operator automatically generates a highly privileged cert to talk to NiFi securely. We are going to borrow that cert for Prometheus. Run this command to find the Operator’s user certificate: kubectl get secrets -n cfm-streaming | grep kubernetes.io/tls Look for mynifi-cfm-operator-user-cert. This is our golden ticket which we will take with us below into our NiFi ServiceMonitor. Discovery with ServiceMonitorPermalink Now we tell Prometheus to scrape NiFi, handing it the certificate so it can breeze past the 401 Unauthorized screens. We also use a relabelings block to ensure the Host header perfectly matches what NiFi’s Jetty server expects (preventing a 400 Bad Request error). Save this as nifi-service-monitor.yaml: apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: nifi-service-monitor
namespace: cfm-streaming
labels:
release: prometheus
spec:
selector:
matchLabels:
app.kubernetes.io/name: server
app.kubernetes.io/instance: mynifi
namespaceSelector:
matchNames:
- cfm-streaming
endpoints:
- port: https
path: /nifi-api/flow/metrics/prometheus
interval: 30s
scheme: https
tlsConfig:
insecureSkipVerify: true
serverName: mynifi-web.cfm-streaming.svc.cluster.local
# Explicit CA mapping fixes the "none configured" error
ca:
secret:
name: mynifi-cfm-operator-user-cert
key: ca.crt
# The mTLS Bypass Client Certs
cert:
secret:
name: mynifi-cfm-operator-user-cert
key: tls.crt
keySecret:
name: mynifi-cfm-operator-user-cert
key: tls.key
relabelings:
- targetLabel: __address__
replacement: mynifi-web.cfm-streaming.svc.cluster.local:8443
kubectl apply -f nifi-service-monitor.yaml -n cfm-streaming (Wait about 30 seconds. In your Prometheus UI under Status -> Targets, nifi-service-monitor should now show as 1/1 UP). Querying NiFi Metrics in Prometheus UIPermalink Now that Prometheus has a secure, authenticated channel to NiFi, let’s look at the data. Open the Prometheus UI Graph tab and test these queries: Sample Query 1: Total Bytes Queued sum(nifi_amount_bytes_queued{namespace="cfm-streaming"})
Great for setting up alerts if a downstream system (like Kafka) goes offline and backpressure builds up. Sample Query 2: Total Items Queued sum(nifi_amount_items_queued{namespace="cfm-streaming"})
Sample Query 3: Active Threads sum(nifi_active_threads{namespace="cfm-streaming"})
Visualizing CFM NiFi with Grafana DashboardsPermalink With Prometheus pulling the data, let’s load up a beautiful community-built dashboard. Step 1: Import the Dashboard Open Grafana and navigate to Dashboards -> New -> Import. In the “Import via grafana.com” box, type 15822 (or 12375) and click Load. Select your Prometheus data source at the bottom and click Import. Boom. You now have full JVM stats, FlowFile queue tracking, and throughput metrics. SummaryPermalink By leveraging the CFM Operator’s native mynifi-cfm-operator-user-cert, you have successfully engineered an mTLS bridge that bypasses NiFi’s strict Single User Auth lockdown. We didn’t just find a workaround for the “401 Unauthorized” errors; we architected a secure, automated discovery path that allows Prometheus to scrape sensitive metrics without compromising the security of your data orchestration layer. This configuration effectively solves the “networking puzzle” of NiFi 2.x observability. By aligning your ServiceMonitor with NiFi’s strict SNI and Host header requirements, you’ve ensured that your monitoring stack remains as resilient as the pipelines it tracks. You now have the declarative tools to move beyond basic health checks and into deep, cross-namespace correlation. With this piece of the puzzle in place, you can finally realize the “Master Plan”: a single pane of glass where you can watch NiFi’s outbound data rates flow in perfect synchronization with Kafka’s inbound throughput. You no longer have to guess where a bottleneck resides; you have the real-time telemetry required to prove exactly how data is moving through your entire Cloudera Streaming architecture. 3. Monitoring Cloudera Streaming Analytics (CSA) with Prometheus If you followed our previous guides on monitoring Cloudera Streams Messaging (CSM) and Cloudera Flow Management (CFM), you now have visibility into your data ingestion (NiFi) and event streaming (Kafka). But what about monitoring the streams processing jobs (FLINK) in Cloudera Streaming Analytics (CSA)? When running Flink and SQL Stream Builder (SSB) via the CSA Operator, flink jobs spin up dynamically on Kubernetes. Because these dynamically generated TaskManager pods don’t explicitly declare metric ports in their Kubernetes spec, standard Prometheus PodMonitors will silently drop the targets—making job metric discovery a bit of kubernetes spaghetti. In this third and final post of the series, we’re going to wire up our CSA Flink jobs to our existing Prometheus + Grafana stack. By utilizing a Headless Service to bypass strict pod-spec validation natively, we will finally complete plugging our CFM NiFi Operator, CSA Flink Operator, and CSM Kafka Operator into Prometheus and Grafana stack for monitoring. Create the Prometheus Values FilePermalink Create this file in the root of your repo. This forces Flink to open port 9249 for metrics scraping. csa-prometheus-values.yaml # csa-prometheus-values.yaml
# Enables native PrometheusReporter for ALL SQL Stream Builder (SSB) jobs
ssb:
flinkConfiguration:
flink-conf.yaml: |
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9249"
taskmanager.network.detailed-metrics: "true"
# Optional: cleaner metric labels for Grafana dashboards
metrics.scope.jm: "flink.jobmanager.<host>"
metrics.scope.tm: "flink.taskmanager.<host>.<tm_id>"
metrics.scope.job: "flink.job.<job_id>.<job_name>"
Helm Install CommandPermalink Run this exact command: helm install csa-operator \
oci://container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator \
--namespace cld-streaming \
--create-namespace \
--version 1.5.0-b275 \
--values ./csa-prometheus-values.yaml \
--set 'flink-kubernetes-operator.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.sse.image.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.sqlRunner.image.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.mve.image.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.database.imagePullSecrets[0].name=cloudera-creds' \
--set 'ssb.flink.image.imagePullSecrets[0].name=cloudera-creds' \
--set-file flink-kubernetes-operator.clouderaLicense.fileContent=./license.txt Verify the InstallPermalink # 1. Helm release
helm list -n cld-streaming
# 2. All pods running
kubectl get pods -n cld-streaming
# 3. Confirm Prometheus config was applied
helm get values csa-operator -n cld-streaming | grep -A 20 "flink-conf.yaml"
Discovery with Headless Service & ServiceMonitorPermalink Because Flink Native Kubernetes does not explicitly declare port 9249 in its dynamic pod specs, standard PodMonitors will drop the targets. Instead, we bridge the gap using a Headless Service and a ServiceMonitor. A. Create the Headless Service (csa-flink-service.yaml) apiVersion: v1
kind: Service
metadata:
name: csa-flink-metrics-service
namespace: cld-streaming
labels:
app: csa-flink-metrics
spec:
clusterIP: None # Makes it a headless service
selector:
# This automatically captures ALL Flink pods (JobManagers & TaskManagers)
type: flink-native-kubernetes
ports:
- name: prom-metrics
port: 9249
targetPort: 9249
B. Create the ServiceMonitor (csa-flink-service-monitor.yaml) apiVersion: [monitoring.coreos.com/v1](https://monitoring.coreos.com/v1)
kind: ServiceMonitor
metadata:
name: csa-flink-metrics-monitor
namespace: cld-streaming
labels:
release: prometheus # Must match your Prometheus Operator release label
spec:
selector:
matchLabels:
app: csa-flink-metrics
namespaceSelector:
matchNames:
- cld-streaming
endpoints:
- port: prom-metrics
interval: 15s
scrapeTimeout: 10s
relabelings:
# Extracts labels so Grafana dashboards automatically map deployments
- sourceLabels: [__meta_kubernetes_pod_label_app]
targetLabel: flink_deployment
- sourceLabels: [__meta_kubernetes_pod_label_component]
targetLabel: component
- sourceLabels: [__meta_kubernetes_pod_name]
targetLabel: pod
- sourceLabels: [__meta_kubernetes_namespace]
targetLabel: namespace
C. Apply both files: kubectl apply -f csa-flink-service.yaml -n cld-streaming
kubectl apply -f csa-flink-service-monitor.yaml -n cld-streaming Wait ~30 seconds, then check Prometheus UI (Status -> Targets). You should see your JobManagers and TaskManagers listed as UP under serviceMonitor/cld-streaming/csa-flink-metrics-monitor/0. Test Prometheus MetricsPermalink Open SSB UI: minikube service ssb-sse --namespace cld-streaming Run any SQL job in Sql Stream Builder. Verify metrics are exposed directly from a pod: # Replace with your actual taskmanager pod name
kubectl exec -it ssb-session-admin-taskmanager-1-3 -n cld-streaming -- \
curl -s http://localhost:9249/metrics | head -20
You should see flink_ metrics. Querying SSB / Flink Metrics in Prometheus UIPermalink Sample Query 1: JVM CPU Load flink_taskmanager_Status_JVM_CPU_Load{namespace="cld-streaming"}
Sample Query 2: Job Uptime flink_jobmanager_job_uptime{namespace="cld-streaming"}
Sample Query 3: Records In/Out Per Second sum(flink_taskmanager_job_task_operator_numRecordsInPerSecond{namespace="cld-streaming"}) by (job_name)
End-to-End Pipeline (NiFi → SSB → Kafka) sum(rate(nifi_bytes_sent{namespace="cfm-streaming"}[5m]))
or
sum(flink_taskmanager_job_task_operator_numRecordsInPerSecond{namespace="cld-streaming"})
or
sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m]))
Visualizing in GrafanaPermalink Import the Cloudera CSA Flink Dashboard Download the CSA Flink Dashboard JSON: curl -O https://raw.githubusercontent.com/cldr-steven-matison/ClouderaStreamingOperators/refs/heads/main/csa-flink-dashboard.json In Grafana → Dashboards → New → Import Click Upload JSON file and select the downloaded file. On the next screen: Datasource → select your Prometheus data source Click Import Boom. You now have the new Cloudera CSA Flink Dashboard in Grafana: SummaryPermalink With this final piece in place, you have successfully built a complete, end-to-end observability pipeline across your entire Cloudera Streaming Operators architecture. By bridging CFM (NiFi) for ingestion, CSM (Kafka) for event streaming, and CSA (SQL Stream Builder / Flink) for real-time processing, you now have a unified view of your data’s lifecycle within a single Prometheus and Grafana stack. In this specific guide we implemented a Headless Service and a ServiceMonitor to bypass the strict pod-spec limitations of Flink Native Kubernetes. This ensures that every dynamically provisioned JobManager and TaskManager is automatically discovered and scraped by Prometheus, completely eliminating the silent “0 targets” discovery failures during setup. You can now reliably execute complex PromQL queries in Prometheus across namespaces and correlate behavior across entirely different engines. Whether you are tracking backpressure in NiFi, monitoring consumer lag in Kafka, or measuring checkpoint durations and records-per-second in Flink, you finally have the single pane of glass required to confidently debug, tune, scale, and monitor your streaming data pipelines. End to End CSO Dashboard with GrafanaPermalink Now that we have all of our operator based metrics flowing, all of the operator dashboards setup, and a good understanding of how Prometheus and Grafana queries work. We can easily build a new Fraud Dashboard with Grafana. Download the CSO Fraud Detection Dashboard JSON and import it into Grafana. Summary: Observability in Kubernetes AchievedPermalink By wiring CFM (NiFi), CSM (Kafka). CSA (Flink/SSB) metrics to Prometheus, you have successfully built the complete, end-to-end observability of the Cloudera Streaming Operators. We didn’t just flip a switch to turn on metrics—we architected a robust, Kubernetes-native solution that respects strict SNI headers, leverages mTLS for secure API scraping, and utilizes headless services to bypass dynamic pod-spec limitations. Best of all, your entire monitoring configuration remains declarative and fully Git-trackable. You can now reliably execute complex PromQL queries across namespaces, correlating behavior across entirely different engines. When you can overlay NiFi’s outbound byte rate directly on top of Kafka’s inbound throughput on the exact same Grafana dashboard, you no longer have to guess where a bottleneck resides. You have the telemetry to prove it. ResourcesPermalink Cloudera Streams Messaging (CSM) 1.6 Docs Cloudera Streaming Analytics (CSA) 1.5 Docs Cloudera Flow Management (CFM) 3.0 Docs Cloudera Streaming Operators GitHub Repo Cloudera Streaming Operators Blog
... View more
05-28-2026
06:15 AM
@AlokKumar I absolutely love this question. YES, it is possible! I recently built an API with NiFI and guess what, no auth!! 😥 It is just a web api handling form posts, so it does nothing requiring auth, but it does respond with appropriate error codes if things happen unexpectedly. I can see you are thinking in terms of needing to add authentication layer which I think is required. Two solutions: 1. Provide an auth mechanism in front of NiFi within load balancer. 2. Build this auth check into the nifi api flow itself. For the latter, nifi can do anything right? There are many ways to do this, but after HandleHttpRequest, you could check an external system for valid user/pass, token, etc. I think your specificl requirements would dictate the logic further. An invalid auth would return appropriate HandleHttpResponse w/ 4xx error codes. One thing I would recommend is accounting for timeouts or slow clients. If a client is waiting for an external auth check, you need to be sensitive that call out could take too long in terms of the api connection. Make the nifi flow account for that scenario as well to handle the client timeout. If this is a major concern, i would investigate the first solution.
... View more
05-28-2026
05:57 AM
@zzzz77 In your bigger machine env are you adjusting the flow to tune peformance? E.G. Do you increase concurrency, adjust active threads pool, etc to make sure that you are getting the most possible use of the cores? This is where you should start. You should be able to get a lot more active threads going in the larger env before needing to worry about disk contention. You may want to bump up the ram min/max, but i would do this methodically. If its 8, go 16 and see the results, then 32 and compare all 3. 32 should be as high as you need to go, but I have seen higher. NiFi does a good job of memory management above the min/max. Ideally you would want nifi disks mounted separately (see docs) but since you already have a baseline in dev likely without dedicated disks, I suspect you will see improvements using all 32+ cores vs 8 even with "slow" disks... This is in k8s/nifi, but you will see how to crank up the CPU: https://stevenmatison.com/blog/Max-CPU-with-NiFi-on-Minikube/
... View more
05-28-2026
05:40 AM
Here is another way... Instead of basic auth (user/pass), you could use Kerberos to authenticate the request programmatically. This removes the need for hardcoded credentials. Using Python (requests-kerberos): Python import requests
from requests_kerberos import HTTPKerberosAuth
knox_url = "https://<knox-host>:8443/gateway/knoxsso/api/v1/token"
# This uses your existing kinit session
response = requests.get(knox_url, auth=HTTPKerberosAuth(), verify=False)
if response.status_code == 200:
token_data = response.json()
print(f"Your Token: {token_data['access_token']}") Set up a Kerberos keytab for your service account, and use a script (Python or Java) to hit the Knox Token API using SPNEGO. This is the enterprise-standard way to automate Knox token generation without the Web UI or manual password entry. I think there are quite a few alternatives here, java, nifi, etc
... View more
05-20-2026
04:54 AM
@NovSeek I believe the sample above is just demonstrating use of API call. You should be able to complete the same outcome in any other manner (e.g. not curl to a text file) programaticallly against the CM/Knox api.
... View more
05-13-2026
06:08 AM
2 Kudos
We are all using AI to write code, but when it comes to Apache NiFi, the current landscape often resembles the Wild West. Whether you are generating synthetic data scripts or translating complex machine learning models, Large Language Models (LLMs) are incredible accelerators. However, if you ask an AI to write a native Apache NiFi 2.0 Python processor from scratch, there is a very high probability it will confidently hand you code that instantly breaks your canvas. NiFi 2.0’s Python API is relatively new, and most AI training data is heavily saturated with legacy NiFi 1.x ExecuteScript solutions (using Jython or Groovy). Even when an AI correctly identifies the 2.0 API, it frequently misconfigures the underlying Java-to-Python bridge, resulting in “ghost” processors with dashed lines and missing routing relationships. In this post, I am going to share the exact methodology I used to leverage AI for writing custom NiFi processors safely, ensuring my dataflows operate seamlessly with my custom Python logic. The Input: Example Fraud Python ScriptPermalink This script’s logic assumes that transactions originating from two specific cities, or those exceeding $10,000, constitute fraud. Traditionally, this fraud model is intended to be deployed on Cloudera Machine Learning (CML) in a Workbench session and invoked in NiFi via the InvokeHTTP processor. I have tested this architecture, and it works flawlessly. Unfortunately, this integration is often unavailable during local Kubernetes testing (which is the focus of this post) outside of the Cloudera Public Cloud. Therefore, this script serves as a bridge to ensure the same Python responses can be tested natively, allowing downstream test data to flow in non-CML-connected environments. import cml.models_v1 as models
SUSPICIOUS_CITIES = {
"Lagos": {"lat": 6.5244, "lon": 3.3792},
"New Delhi": {"lat": 28.6139, "lon": 77.2090}
}
# 0.45 degrees (~50km) is the exact mathematical net needed to catch all of Steven's regional fraud
TOLERANCE = 0.5
# These 3 accounts have valid data that geographically overlaps with the fraud zones.
# We whitelist them from the location-based heuristic to ensure a pristine demo.
DEMO_SAFE_ACCOUNTS = []
def is_suspicious_location(lat: float, lon: float) -> str:
for city, coords in SUSPICIOUS_CITIES.items():
if (abs(lat - coords["lat"]) <= TOLERANCE) and (abs(lon - coords["lon"]) <= TOLERANCE):
return city
return None
@models.cml_model
def detect_fraud(args):
is_fraud = False
explanations = {}
# Rule 1: High Amount Threshold (>$10k is ALWAYS flagged)
if args["amount"] > 10000:
is_fraud = True
explanations["amount"] = f"Transaction amount ({args['amount']}) exceeds the 10,000 limit."
# Rule 2: Originates strictly around restricted geographies
# We skip this check if it's one of the overlapping good accounts
if args["account_id"] not in DEMO_SAFE_ACCOUNTS:
suspicious_city = is_suspicious_location(args["lat"], args["lon"])
if suspicious_city:
is_fraud = True
explanations["location"] = f"Transaction originated from a high-risk region near {suspicious_city}."
if is_fraud:
return {
"fraud_score": 0.99,
"risk_level": "HIGH",
"decision": "REVIEW",
"explanations": explanations
}
else:
return {
"fraud_score": 0.01,
"risk_level": "LOW",
"decision": "APPROVE",
"explanations": {"status": "all heuristic checks passed"}
} Rule 1: The AI Writes the Logic, You Own the FrameworkPermalink The biggest mistake you can make is copying and pasting a complete Python processor generated by an AI directly into your /extensions directory. AI models often hallucinate complex, aspirational examples that do not function as expected in your specific environment. When an AI provides malformed custom processor code, NiFi will either fail to load the processor entirely or, worse, load it but refuse to display the success and failure relationships in the UI. The Pro Move: Pin the AI within a strict, proven architectural skeleton for the NiFi wrapper. I am going to show you one right now! By “pin,” I mean I essentially had to wrestle the AI and lock it down using my first processor example. I proved to the AI that my example processor worked, and together we confirmed the baseline processor GenericTransform framework functioned correctly. Finally, we moved forward with constructing the actual custom nifi processor I needed. 💪 Rule 2: Prove the Skeleton FirstPermalink Before you introduce a single line of AI-generated business logic, deploy a bare-minimum structural template to the canvas. If the skeleton doesn’t load and route data, your complex logic will fail as well. This exercise also proves that you understand how to deliver and iterate versions of a processor for rapid testing in the NiFi UI. Here is the exact GenericTransform framework I used. It does nothing but pass data through, but it proves the custom processor can compile and expose its relationships natively. import json from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult class GenericTransformTemplate(FlowFileTransform): # Mandatory: Registers the processor with the NiFi backend class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.1-BASE' description = 'Bare-minimum framework to test NiFi UI integration.' tags = ['template', 'framework'] def __init__(self, **kwargs): # 'pass' is the safest initialization in many containerized environments pass def transform(self, context, flowfile): contents_str = flowfile.getContentsAsBytes().decode('utf-8') attributes = flowfile.getAttributes() # Route directly to success without modification return FlowFileTransformResult( relationship='success', attributes=attributes, contents=contents_str ) Test it: Drop this into your extensions folder. Wait 30 seconds. Drag it onto the canvas. Can you connect the success relationship to a LogAttribute processor? Yes? Now you are ready for the AI code. Rule 3: Inject Python Logic DefensivelyPermalink Once your skeleton is proven, prompt your AI to write strictly isolated Python changes within the confines of the processor framework. By this point, the AI should understand your exact architectural approach, making functional Python improvements relatively straightforward. When injecting new Python logic into your data pipeline, you must code defensively against edge cases: The Array Trap: AI assumes FlowFiles contain a single JSON object. If your upstream generator creates an array of transactions, the AI’s .get() dictionary methods will trigger fatal AttributeErrors. Always wrap your logic to handle both isinstance(payload, list) and single dictionaries. Never Overwrite the Payload: AI scripts often return only the result of their computation. If you replace your FlowFile content with just the ML score, you lose your original transaction_id and break downstream routing. Always append the AI’s output to the existing payload (e.g., payload["ai_response"] = result). Trap Everything: Wrap the AI logic in a try/except block that catches failures, writes the error to an attribute (attributes['python_error'] = str(e)), and safely routes the FlowFile to failure instead of crashing the processor. Anticipate Iteration: Expect to find more edge cases. Keep iterating, and you will get it to work. Rule 4: Master the Hot-Reload WorkflowPermalink The NiFi 2.0 Python API features auto-reloading. You do not need to restart your pod or execute manual scripts to test new custom NiFi Python processor logic. If you are using a local mount (e.g., minikube mount ~/nifi-custom-processors:/extensions): Save your .py file. Wait 30 to 60 seconds. The background thread will detect the file change and recompile it. The UI Catch: The NiFi web canvas aggressively caches UI elements. Refresh your browser and check the processor list for your new version tag to ensure the changes are reflected. The Example: The Output and Working Custom NiFi Processor import json from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult class FraudModel(FlowFileTransform): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.4-SNAPSHOT' description = 'Executes the CML fraud detection model natively in NiFi.' tags = ['fraud', 'detection', 'cml', 'replacement'] def __init__(self, **kwargs): pass # ========================================== # CML MODEL LOGIC # ========================================== SUSPICIOUS_CITIES = { "Lagos": {"lat": 6.5244, "lon": 3.3792}, "New Delhi": {"lat": 28.6139, "lon": 77.2090} } TOLERANCE = 0.5 DEMO_SAFE_ACCOUNTS = [] def is_suspicious_location(self, lat: float, lon: float) -> str: for city, coords in self.SUSPICIOUS_CITIES.items(): if (abs(lat - coords["lat"]) <= self.TOLERANCE) and (abs(lon - coords["lon"]) <= self.TOLERANCE): return city return None def detect_fraud(self, args: dict) -> dict: is_fraud = False explanations = {} # Rule 1: High Amount Threshold if args.get("amount", 0) > 10000: is_fraud = True explanations["amount"] = f"Transaction amount ({args.get('amount')}) exceeds the 10,000 limit." # Rule 2: Originates strictly around restricted geographies if args.get("account_id") not in self.DEMO_SAFE_ACCOUNTS: suspicious_city = self.is_suspicious_location(args.get("lat", 0.0), args.get("lon", 0.0)) if suspicious_city: is_fraud = True explanations["location"] = f"Transaction originated from a high-risk region near {suspicious_city}." if is_fraud: return { "fraud_score": 0.99, "risk_level": "HIGH", "decision": "REVIEW", "explanations": explanations } else: return { "fraud_score": 0.01, "risk_level": "LOW", "decision": "APPROVE", "explanations": {"status": "all heuristic checks passed"} } # ========================================== def transform(self, context, flowfile): contents_str = flowfile.getContentsAsBytes().decode('utf-8') attributes = flowfile.getAttributes() try: # Parse incoming JSON payload = json.loads(contents_str) # The upstream generator sometimes creates lists of transactions. # Handle both lists and single dictionaries safely. if isinstance(payload, list): for tx in payload: tx["cml_response"] = self.detect_fraud(tx) enriched_data = payload else: payload["cml_response"] = self.detect_fraud(payload) enriched_data = payload return FlowFileTransformResult( relationship='success', attributes=attributes, contents=json.dumps(enriched_data) ) except Exception as e: # If JSON parsing fails, route to failure and tag the error attributes['cml_error'] = str(e) return FlowFileTransformResult( relationship='failure', attributes=attributes, contents=contents_str ) Permalink The VerdictPermalink AI is an incredible tool for writing the heavy-lifting logic inside NiFi 2.0 Python processors, but it is a terrible architect for the processor framework itself. By treating my example NiFi API wrapper as a rigid, protected skeleton and carefully injecting Python logic inside of it, I was able to create this processor at lightning speed. How many times do you think it took me to get this Python processor code to work? The version is 4, so it took me 4 iterations from the start to finish to complete the processor in this excercise. Now fire up your cluster, open up a Python script, and see if you can transform it into a custom NiFi processor! ResourcesPermalink Custom NiFi Processors with Cloudera Streaming Operators NiFi2 Processor Playground Cloudera Streaming Operators GitHub Repo NiFi Python Developer’s Guide AppendixPermalink NiFi 2.0 Custom Python Processor with PandasPermalink This is written as a complete, copy-paste-ready sample that any engineer can drop into a new environment for immediate testing. No changes to the K8s CR, mount, or pod are required to build this new python processor in the Cloudera Streaming Operator footprint. Similar steps can be duplicated in any appropriate NiFi 2.0 context. Objective Create a new, self-contained native Python processor named PandasJSONTransformer: Accepts JSON content in a FlowFile (e.g. output from TransactionGenerator). Loads it into a Pandas DataFrame. Using lon/lat determines distance from home (defined in script). Outputs the transformed JSON on the success relationship. Input Flow File: [ {
"ts" : "2026-05-05 14:55:11",
"account_id" : "943",
"transaction_id" : "6a9b1242-4892-11f1-b035-3a8bcd2ccadb",
"amount" : 64,
"lat" : 44.3568905517,
"lon" : -0.6186160357,
"nearest_city" : "Lagos",
"nearest_country" : "Nigeria"
} ]
Step 1: Create the New Processor File Navigate to the exact directory where TransactionGenerator.py lives: cd ~/nifi-custom-processors # ← adjust only if your local path is different
Create the new file PandasJSONTransformer.py with the following code: import json
import io
import pandas as pd
import numpy as np
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
class PandasJSONTransformer(FlowFileTransform):
class Java:
# Essential: Ensures success and failure relationships appear in NiFi
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '1.0.7-FINAL'
description = 'An example processor using python pandas.'
tags = ['pandas', 'poc', 'geospatial']
dependencies = ['pandas', 'numpy'] # NiFi auto-installs these
def __init__(self, **kwargs):
# 'pass' is the safest initialization for this environment
pass
def transform(self, context, flowfile):
content_bytes = flowfile.getContentsAsBytes()
attributes = flowfile.getAttributes()
# Merritt Island, FL Coordinates
HOME_LAT, HOME_LON = 28.3181, -80.6660
try:
# Step 1: Handle the "Array Trap"
# Even for single records, we wrap in a list so Pandas creates a proper DataFrame row
raw_data = json.loads(content_bytes.decode('utf-8'))
if not isinstance(raw_data, list):
raw_data = [raw_data]
df = pd.DataFrame(raw_data)
# Step 2: Proof of Concept Math
if 'lat' in df.columns and 'lon' in df.columns:
df['lat'] = pd.to_numeric(df['lat'], errors='coerce')
df['lon'] = pd.to_numeric(df['lon'], errors='coerce')
# Calculate Euclidean distance from Merritt Island:
# dist = sqrt((lat1 - lat2)^2 + (lon1 - lon2)^2)
df['dist_from_home'] = np.sqrt(
(df['lat'] - HOME_LAT)**2 + (df['lon'] - HOME_LON)**2
)
# Add a simple flag to show Pandas touched the data
df['pandas_processed'] = True
# Step 3: Output Generation
output_json = df.to_json(orient='records', indent=None)
return FlowFileTransformResult(
relationship='success',
contents=output_json.encode('utf-8'),
attributes={
**attributes,
'pandas.transformed': 'true',
'pandas.version': pd.__version__
}
)
except Exception as e:
# Rule 3: Defensive failure routing
return FlowFileTransformResult(
relationship='failure',
contents=content_bytes,
attributes={**attributes, 'pandas.error': str(e)}
)
Step 2: Deploy & Activate Ensure the minikube mount is still running: minikube mount ~/nifi-custom-processors:/extensions --uid 10001 --gid 10001 NiFi 2.0 will automatically detect new/updated .py files in the extensions directory (usually within 10–30 seconds). When testing python changes, increment the version in the code (1.0.1) and re-save the file after each code change — this forces a clean reload. If you are impatient like me you may be refreshing the page to notice new processors. Step 3: Verification in NiFi UI Open NiFi canvas. Drag a new processor and search for PandasJSONTransformer. The new processor should appear with the exact description and version from the code. Simple test flow: TransactionGenerator → PandasJSONTransformer. Flow Definition File. Run the flow. Check output flowfile for the new columns dist_from_home and pandas_processed. Be patient on first processor attempt after dragging it to the canvas. The processor will indicate dependencies are downloading when it is first introduced to the canvas. The processor must complete this dependency state before allowing you to route Success/Failure. Step 4: Hand-Off Framework for Any Other Environment To replicate this exact processor in a different NiFi 2.0 environment: Place PandasJSONTransformer.py in the Python extensions path. Complete the Deployment Steps 1–3 above. Verify pandas are installed by NiFi. Confirm flowfile output is as expected. Output Flow File: [ {
"ts" : "2026-05-05 15:10:13",
"account_id" : "487",
"transaction_id" : "xxx84324584-4894-11f1-b035-3a8bcd2ccadb",
"amount" : 39,
"lat" : 48.4010217027,
"lon" : 4.7099962916,
"dist_from_home" : 87.7062397261,
"pandas_processed" : true
} ]
Troubleshooting # Check NiFi pod logs for processor loading
kubectl logs -n cld-streaming mynifi-0 | grep -i pandas
# check pod for python extensions
kubectl exec -n cfm-streaming mynifi-0 -- ls -la /opt/nifi/nifi-current/python/extensions
... View more
04-14-2026
09:24 AM
If you are following along with Cloudera Streaming Operators I have just posted this Developer Blog here on the Cloudera community: https://community.cloudera.com/t5/Developer-Blogs/Building-a-Fully-Local-RAG-with-Cloudera-Streaming-Operators/ba-p/413825
... View more
04-14-2026
09:20 AM
I added another blog post showing how to make custom processors with Cloudera Streaming Operators: https://cldr-steven-matison.github.io/blog/Custom-Processors-With-Cloudera-Streaming-Operators/ And the github: https://github.com/cldr-steven-matison/NiFi2-Processor-Playground
... View more