Developer Blogs

Observability with Cloudera Streaming Operators : 3-part Series

avatar

2026-05-05-Observabiliy_With_Cloudera_Streaming_Operators.png

If you are running NiFi Kafka or Flink based applications and workloads in kubernetes, you know that visibility is everything. You can build the most complex data pipelines in the world, but without eyes on your throughput, queues, or other streaming metrics, you’re essentially flying blind.

Welcome to the ultimate page for Kubernetes native observability. In this series, we walk through the exact steps to wire up the entire Cloudera Streaming Operator architecture—NiFi, Kafka, and Flink—into a unified Prometheus and Grafana stack.

By the end of this journey, you won’t just have basic health checks; you will have a single pane of glass correlating NiFi’s data flow metrics perfectly with Kafka’s topic throughput and Flink’s stream processing metrics.


PrerequisitesPermalink

This lesson assumes you have already:

  1. Completed deployment of Cloudera Streaming Operators
  2. Have the minikube branch of Streams Processing Hands on Lab setup completed, nifi flow is running, topics txn,tnx2, and txn_fraud exist, Sql Stream Builder Jobs running with operational polling.
  3. Cloned the latest Cloudera Streaming Operators GitHub repo in ~/ local path.

 Warning! Some of the excercises include new helm install commands. Be prepared to use your helm uninstall commands as needed. Install/Uninstall is good practice to reset your stage. However if you are using AI to execute against this plan, you can helm upgrade or kubectl apply patches to get desired outcome(s).

Prometheus InstallPermalink

Before diving into the specific operators, you need to install the central monitoring stack. We will be using the community Prometheus Operator. Ensure your Kubernetes environment is ready, and run the following commands to install the Prometheus Operator and Grafana into the cld-streaming namespace.

1. Add the Helm Repo:

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts

2. Install the Kube-Prometheus-Stack: This specific configuration enables proxy access, sets up the default datasources, and configures the Operator to watch for PodMonitors and ServiceMonitors across all namespaces ({ }).

helm install prometheus prometheus-community/kube-prometheus-stack \
  --namespace cld-streaming --create-namespace \
  --set grafana.sidecar.datasources.defaultDatasourceEnabled=false \
  --set 'grafana.additionalDataSources[0].name=Prometheus' \
  --set 'grafana.additionalDataSources[0].type=prometheus' \
  --set 'grafana.additionalDataSources[0].url=http://prometheus-kube-prometheus-prometheus.cld-streaming.svc.cluster.local:9090' \
  --set 'grafana.additionalDataSources[0].access=proxy' \
  --set 'grafana.additionalDataSources[0].isDefault=true' \
  --set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false \
  --set prometheus.prometheusSpec.podMonitorSelectorNilUsesHelmValues=false \
  --set-json 'prometheus.prometheusSpec.serviceMonitorNamespaceSelector={}' \
  --set-json 'prometheus.prometheusSpec.podMonitorNamespaceSelector={}'

Exposing the Prometheus and Grafana UIsPermalink

Grab the URLs and keep the tunnels alive in separate terminals.

Tab 1: Prometheus UI

minikube service prometheus-kube-prometheus-prometheus -n cld-streaming --url

Tab 2: Grafana UI

minikube service prometheus-grafana -n cld-streaming --url

You can use this command to get the admin password:

kubectl get secret --namespace cld-streaming prometheus-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

The Cloudera Streaming Operators Integration Series

  1. Monitoring Cloudera Streams Messaging (CSM) with Prometheus
  2. Monitoring Cloudera Flow Management (CFM) with Prometheus
  3. Monitoring Cloudera Streaming Analytics (CSA) with Prometheus

1. Monitoring Cloudera Streams Messaging (CSM) with Prometheus

Apache Kafka is the undeniable backbone of modern real-time data, but monitoring its internal health on Kubernetes can often feel like trying to pick a lock. While the Strimzi-powered Cloudera Streams Messaging (CSM) Operator effortlessly spins up your brokers, the critical metrics you need to keep things running smoothly—like byte throughput and under-replicated partitions—are trapped deep inside the JVM. Because Prometheus doesn’t natively speak JMX, we can’t just open a port and call it a day.

In Part 1 of this series, we are going to crack open that black box around Kafka. We will walk step-by-step through injecting a custom JMX Prometheus Exporter into your CSM cluster and deploying a specialized PodMonitor to translate those buried JVM metrics into crystal-clear results in Prometheus and Grafana.


The Metrics ConfigMapPermalink

First, we need to define how Kafka’s JMX metrics are converted into Prometheus format. Create kafka-metrics-config.yaml:

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  labels:
    app: strimzi
data:
  kafka-metrics-config.yaml: |
    # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
    lowercaseOutputName: true
    rules:
    # Special cases and very specific rules
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total):"
      name: kafka_server_$1_$4
      type: COUNTER
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total)
      name: kafka_server_$1_$4
      type: COUNTER
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    # Some percent metrics use MeanRate attribute
    # Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    # Generic gauges for percents
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    # Generic per-second counters with 0-2 key/value pairs
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    # Generic gauges with 0-2 key/value pairs
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
    # Note that these are missing the '_sum' metric!
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    # KRaft overall related metrics
    # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(current-state): (.+)"
      name: kafka_server_raftmetrics_$1
      value: 1
      type: UNTYPED
      labels:
        $1: "$2"
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    # KRaft "low level" channels related metrics
    # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    # Broker metrics related to fetching metadata topic records in KRaft mode
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

Apply the yaml:

kubectl apply -f kafka-metrics-config.yaml -n cld-streaming

The Kafka Cluster ConfigPermalink

Create the kafka-nodepool.yaml:

apiVersion: kafka.strimzi.io/v1
kind: KafkaNodePool
metadata:
  name: combined
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        kraftMetadata: shared
        deleteClaim: false

Apply the yaml:

kubectl apply -f kafka-nodepool.yaml -n cld-streaming`

Create the kafka-eval-prometheus.yaml:

apiVersion: kafka.strimzi.io/v1
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 4.1.1.1.6
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yaml
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    topicOperator: {}
    userOperator: {}

Apply the yaml:

kubectl apply -f kafka-eval-prometheus.yaml -n cld-streaming

Discovery with PodMonitorPermalink

Now we tell Prometheus to go find our brokers. Create our PodMoinitor strimzi-pod-monitor.yaml:

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: strimzi-pod-monitor
  namespace: cld-streaming
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      strimzi.io/cluster: my-cluster
      strimzi.io/kind: Kafka
  namespaceSelector:
    matchNames:
      - cld-streaming
  podMetricsEndpoints:
    - path: /metrics
      targetPort: 9404
      interval: 30s
      relabelings:
        # Map Strimzi pod labels (strimzi.io/...) to top-level metric labels the dashboard expects
        - action: labelmap
          regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
          replacement: $1
        - action: labelmap
          regex: __meta_kubernetes_pod_label_(.+)
          replacement: $1
        # Standard K8s labels the dashboard variables use
        - sourceLabels: [__meta_kubernetes_namespace]
          targetLabel: namespace
        - sourceLabels: [__meta_kubernetes_pod_name]
          targetLabel: kubernetes_pod_name
        - sourceLabels: [__meta_kubernetes_pod_name]
          targetLabel: pod_name
        - sourceLabels: [__meta_kubernetes_pod_node_name]
          targetLabel: node_name

Apply the yaml:

kubectl apply -f strimzi-pod-monitor.yaml -n cld-streaming

Querying Kafka Metrics in Prometheus UIPermalink

You should you have the Prometheus UI exposed via minikube service and your strimzi-pod-monitor shows 3/3 targets UP.

  • Verification: Go to Status -> Targets. Look for strimzi-pod-monitor. It should be UP.

Now you can start exploring live metrics from your CSM Operator Kafka cluster in real time.

The JMX Prometheus Exporter is successfully scraping your brokers on port 9404. Your Kafka brokers are named my-cluster-combined-* due to the combined KafkaNodePool.

In the Prometheus UI switch to the Graph tab, and paste in the queries below.

Sample Query 1: Topic Messages In Per Second (Confirmed Throughput)

sum(rate(kafka_server_brokertopicmetrics_messagesin_total{topic=~"txn1|txn2|txn_fraud"}[5m])) by (pod, topic)

This query aggregates messages ingested per second, grouped by broker pod and topic. Watch it spike when your producers or NiFi flows push data into the txn topics. Excellent for spotting sudden drops or imbalances across brokers.

Sample Query 2: Topic Bytes In Per Second (Throughput in Bytes)

sum(rate(kafka_server_brokertopicmetrics_bytesin_total[5m])) by (topic)

This query shows the incoming byte rate per topic over a 5-minute window. It gives you a clear picture of actual data volume flowing into txn1, txn2, and especially txn_fraud. Because it uses rate(), the graph is much smoother and more useful for monitoring real-world throughput.

Quick Tips for This Setup

  • Filter by your actual broker pods when needed:

    sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m])) by (topic, pod)
    
  • Add namespace filtering for cleaner results:

    sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m])) by (topic)
    
  • If any query returns no data, make sure you are actively producing messages to the topics. Then restart Prometheus to force a fresh scrape:

    kubectl rollout restart statefulset prometheus-prometheus-kube-prometheus-prometheus -n cld-streaming

Run these sample queries while your NiFi flow is actively sending data to txn1, txn2, and txn_fraud. You should now see clear, live throughput numbers appearing in the Prometheus graphs.

This gives you immediate visibility into both message rate and data volume — perfect for evaluating how well your CSM Opeator deployed Kafka cluster is handling the workload.


Visualizing CSM Kafka with Grafana DashboardsPermalink

With Prometheus feeding live data, Grafana turns those raw metrics into professional dashboards. However, “no data” is a common issue at this stage — usually because Prometheus is not yet scraping the Kafka brokers or the dashboard variables don’t match your labels.

Open Grafana (minikube service grafana -n cld-streaming). Login with admin and the password from the secret (see Section 4).

Verify the Prometheus Data Source
Go to Configuration → Data Sources.

  • The “Prometheus” source should point to something like http://prometheus-operated.monitoring.svc:9090.
  • Click Save & Test. It must say “Data source is working”.
    (Note: The “Test” button is at the bottom of the datasource edit page.)

Import the Cloudera CSM Kafka Dashboard

  1. Download the JSON:
    curl -O https://raw.githubusercontent.com/cldr-steven-matison/ClouderaStreamingOperators/refs/heads/main/csm-kafka-dashboard.json
  2. In Grafana → Dashboards  New  Import
  3. Click Upload JSON file and select the downloaded file.
  4. On the next screen:
    • Datasource → select your Prometheus data source
    • Click Import

Boom. You now have the new Cloudera CSM Kafka Dashboard in Grafana:

Cloudera_CSM_Kafka_Dashboard.png


SummaryPermalink

With the JMX exporter successfully injected and the PodMonitor active, you have cleared the first major hurdle in building an end-to-end observability pipeline. We didn’t just flip a switch; we architected a robust, Kubernetes-native discovery mechanism that respects the Strimzi-based Operator’s strict validation rules while still providing deep, granular visibility into broker performance.

By bridging the gap between Kafka’s internal JMX metrics and Prometheus, you now have the observability needed to monitor everything from message rates to partition health. Whether you are troubleshooting high CPU usage on a specific broker or watching for under-replicated partitions during a scaling event, you now have the raw data required to maintain a healthy cluster.

This setup serves as the foundation for the rest of Cloudera Streaming Operator stack. Now that your event backbone (Kafka) is visible, you are ready to plug in your ingestion (NiFi) and processing (Flink) engines to achieve that elusive “single pane of glass” view across the entire data lifecycle in kubernetes.

Permalink


2. Monitoring Cloudera Flow Management (CFM) with Prometheus

In the previous guide on monitoring Cloudera Streams Messaging (CSM) we added visibility into your Kafka cluster. Data pipelines don’t start at the broker—they often start with NiFi.

When running NiFi via the Cloudera Flow Management (CFM) Operator, securing the cluster with Single User Auth puts the APIs into a strict lockdown. This makes scraping native metrics a bit of a kubernetes challenge.

In this post, we’re going to wire up a secure CFM NiFi 2.x cluster to the Prometheus + Grafana stack, bypassing web authentication safely using mTLS, and ultimately bridging our cross-namespace metrics into a single pane of glass.


The NiFi Cluster Config (The CR)Permalink

In NiFi 2.x, Prometheus metrics are built natively into the application; we don’t need an external JMX exporter like Kafka. However, we do need to tell the CFM Operator to disable standard authentication on the metrics endpoint.

Update your Nifi Custom Resource (nifi-cluster.yaml) with the configOverride block:

apiVersion: [cfm.cloudera.com/v1alpha1](https://cfm.cloudera.com/v1alpha1)
kind: Nifi
metadata:
  name: mynifi
  namespace: cfm-streaming
spec:
  replicas: 1
  nifiVersion: "2.6.0"
  security:
    initialAdminIdentity: "admin"
    nodeCertGen:
      issuerRef:
        name: cfm-operator-ca-issuer-signed
        kind: ClusterIssuer
    singleUserAuth:
      enabled: true
      credentialsSecretName: "nifi-admin-creds"
  configOverride:
    nifiProperties:
      upsert:
        nifi.cluster.leader.election.implementation: "KubernetesLeaderElectionManager"
        # Disable standard auth for the prometheus endpoint
        nifi.web.prometheus.metrics.authenticated: "false"

Apply this configuration and allow the NiFi pods to perform a rolling restart if necessary.


The mTLS VIP Bypass (Finding the Cert)Permalink

Because we have singleUserAuth: enabled, NiFi will fiercely defend its endpoints—even with the property override above—throwing 401 Unauthorized errors at Prometheus. NiFi expects a login token.

To get around the web login completely, we use Client Certificates (mTLS). The CFM Operator automatically generates a highly privileged cert to talk to NiFi securely. We are going to borrow that cert for Prometheus.

Run this command to find the Operator’s user certificate:

kubectl get secrets -n cfm-streaming | grep kubernetes.io/tls

Look for mynifi-cfm-operator-user-cert. This is our golden ticket which we will take with us below into our NiFi ServiceMonitor.


Discovery with ServiceMonitorPermalink

Now we tell Prometheus to scrape NiFi, handing it the certificate so it can breeze past the 401 Unauthorized screens. We also use a relabelings block to ensure the Host header perfectly matches what NiFi’s Jetty server expects (preventing a 400 Bad Request error).

Save this as nifi-service-monitor.yaml:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: nifi-service-monitor
  namespace: cfm-streaming
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: server
      app.kubernetes.io/instance: mynifi
  namespaceSelector:
    matchNames:
      - cfm-streaming
  endpoints:
  - port: https
    path: /nifi-api/flow/metrics/prometheus
    interval: 30s
    scheme: https
    tlsConfig:
      insecureSkipVerify: true
      serverName: mynifi-web.cfm-streaming.svc.cluster.local
      # Explicit CA mapping fixes the "none configured" error
      ca:
        secret:
          name: mynifi-cfm-operator-user-cert
          key: ca.crt
      # The mTLS Bypass Client Certs
      cert:
        secret:
          name: mynifi-cfm-operator-user-cert
          key: tls.crt
      keySecret:
        name: mynifi-cfm-operator-user-cert
        key: tls.key
    relabelings:
      - targetLabel: __address__
        replacement: mynifi-web.cfm-streaming.svc.cluster.local:8443

kubectl apply -f nifi-service-monitor.yaml -n cfm-streaming

(Wait about 30 seconds. In your Prometheus UI under Status -> Targets, nifi-service-monitor should now show as 1/1 UP).


Querying NiFi Metrics in Prometheus UIPermalink

Now that Prometheus has a secure, authenticated channel to NiFi, let’s look at the data. Open the Prometheus UI Graph tab and test these queries:

Sample Query 1: Total Bytes Queued

sum(nifi_amount_bytes_queued{namespace="cfm-streaming"})

Great for setting up alerts if a downstream system (like Kafka) goes offline and backpressure builds up.

Sample Query 2: Total Items Queued

sum(nifi_amount_items_queued{namespace="cfm-streaming"})

Sample Query 3: Active Threads

sum(nifi_active_threads{namespace="cfm-streaming"})

Visualizing CFM NiFi with Grafana DashboardsPermalink

With Prometheus pulling the data, let’s load up a beautiful community-built dashboard.

Step 1: Import the Dashboard

  1. Open Grafana and navigate to Dashboards -> New -> Import.
  2. In the “Import via grafana.com” box, type 15822 (or 12375) and click Load.
  3. Select your Prometheus data source at the bottom and click Import.

Boom. You now have full JVM stats, FlowFile queue tracking, and throughput metrics.

Cloudera_CFM_NiFi_Dashboard.png


SummaryPermalink

By leveraging the CFM Operator’s native mynifi-cfm-operator-user-cert, you have successfully engineered an mTLS bridge that bypasses NiFi’s strict Single User Auth lockdown. We didn’t just find a workaround for the “401 Unauthorized” errors; we architected a secure, automated discovery path that allows Prometheus to scrape sensitive metrics without compromising the security of your data orchestration layer.

This configuration effectively solves the “networking puzzle” of NiFi 2.x observability. By aligning your ServiceMonitor with NiFi’s strict SNI and Host header requirements, you’ve ensured that your monitoring stack remains as resilient as the pipelines it tracks. You now have the declarative tools to move beyond basic health checks and into deep, cross-namespace correlation.

With this piece of the puzzle in place, you can finally realize the “Master Plan”: a single pane of glass where you can watch NiFi’s outbound data rates flow in perfect synchronization with Kafka’s inbound throughput. You no longer have to guess where a bottleneck resides; you have the real-time telemetry required to prove exactly how data is moving through your entire Cloudera Streaming architecture.


3. Monitoring Cloudera Streaming Analytics (CSA) with Prometheus

If you followed our previous guides on monitoring Cloudera Streams Messaging (CSM) and Cloudera Flow Management (CFM), you now have visibility into your data ingestion (NiFi) and event streaming (Kafka). But what about monitoring the streams processing jobs (FLINK) in Cloudera Streaming Analytics (CSA)?

When running Flink and SQL Stream Builder (SSB) via the CSA Operator, flink jobs spin up dynamically on Kubernetes. Because these dynamically generated TaskManager pods don’t explicitly declare metric ports in their Kubernetes spec, standard Prometheus PodMonitors will silently drop the targets—making job metric discovery a bit of kubernetes spaghetti.

In this third and final post of the series, we’re going to wire up our CSA Flink jobs to our existing Prometheus + Grafana stack. By utilizing a Headless Service to bypass strict pod-spec validation natively, we will finally complete plugging our CFM NiFi Operator, CSA Flink Operator, and CSM Kafka Operator into Prometheus and Grafana stack for monitoring.


Create the Prometheus Values FilePermalink

Create this file in the root of your repo. This forces Flink to open port 9249 for metrics scraping.

csa-prometheus-values.yaml

# csa-prometheus-values.yaml
# Enables native PrometheusReporter for ALL SQL Stream Builder (SSB) jobs

ssb:
  flinkConfiguration:
    flink-conf.yaml: |
      metrics.reporters: prom
      metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
      metrics.reporter.prom.port: "9249"
      taskmanager.network.detailed-metrics: "true"

      # Optional: cleaner metric labels for Grafana dashboards
      metrics.scope.jm: "flink.jobmanager.<host>"
      metrics.scope.tm: "flink.taskmanager.<host>.<tm_id>"
      metrics.scope.job: "flink.job.<job_id>.<job_name>"

Helm Install CommandPermalink

Run this exact command:

helm install csa-operator \
  oci://container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator \
  --namespace cld-streaming \
  --create-namespace \
  --version 1.5.0-b275 \
  --values ./csa-prometheus-values.yaml \
  --set 'flink-kubernetes-operator.imagePullSecrets[0].name=cloudera-creds' \
  --set 'ssb.sse.image.imagePullSecrets[0].name=cloudera-creds' \
  --set 'ssb.sqlRunner.image.imagePullSecrets[0].name=cloudera-creds' \
  --set 'ssb.mve.image.imagePullSecrets[0].name=cloudera-creds' \
  --set 'ssb.database.imagePullSecrets[0].name=cloudera-creds' \
  --set 'ssb.flink.image.imagePullSecrets[0].name=cloudera-creds' \
  --set-file flink-kubernetes-operator.clouderaLicense.fileContent=./license.txt

Verify the InstallPermalink

# 1. Helm release
helm list -n cld-streaming

# 2. All pods running
kubectl get pods -n cld-streaming

# 3. Confirm Prometheus config was applied
helm get values csa-operator -n cld-streaming | grep -A 20 "flink-conf.yaml"

Discovery with Headless Service & ServiceMonitorPermalink

Because Flink Native Kubernetes does not explicitly declare port 9249 in its dynamic pod specs, standard PodMonitors will drop the targets. Instead, we bridge the gap using a Headless Service and a ServiceMonitor.

A. Create the Headless Service (csa-flink-service.yaml)

apiVersion: v1
kind: Service
metadata:
  name: csa-flink-metrics-service
  namespace: cld-streaming
  labels:
    app: csa-flink-metrics
spec:
  clusterIP: None  # Makes it a headless service
  selector:
    # This automatically captures ALL Flink pods (JobManagers & TaskManagers)
    type: flink-native-kubernetes
  ports:
    - name: prom-metrics
      port: 9249
      targetPort: 9249

B. Create the ServiceMonitor (csa-flink-service-monitor.yaml)

apiVersion: [monitoring.coreos.com/v1](https://monitoring.coreos.com/v1)
kind: ServiceMonitor
metadata:
  name: csa-flink-metrics-monitor
  namespace: cld-streaming
  labels:
    release: prometheus # Must match your Prometheus Operator release label
spec:
  selector:
    matchLabels:
      app: csa-flink-metrics
  namespaceSelector:
    matchNames:
      - cld-streaming
  endpoints:
    - port: prom-metrics
      interval: 15s
      scrapeTimeout: 10s
      relabelings:
        # Extracts labels so Grafana dashboards automatically map deployments
        - sourceLabels: [__meta_kubernetes_pod_label_app]
          targetLabel: flink_deployment
        - sourceLabels: [__meta_kubernetes_pod_label_component]
          targetLabel: component
        - sourceLabels: [__meta_kubernetes_pod_name]
          targetLabel: pod
        - sourceLabels: [__meta_kubernetes_namespace]
          targetLabel: namespace

C. Apply both files:

kubectl apply -f csa-flink-service.yaml -n cld-streaming
kubectl apply -f csa-flink-service-monitor.yaml -n cld-streaming

Wait ~30 seconds, then check Prometheus UI (Status -> Targets). You should see your JobManagers and TaskManagers listed as UP under serviceMonitor/cld-streaming/csa-flink-metrics-monitor/0.


Test Prometheus MetricsPermalink

  1. Open SSB UI:
    minikube service ssb-sse --namespace cld-streaming
  2. Run any SQL job in Sql Stream Builder.

  3. Verify metrics are exposed directly from a pod:
    # Replace with your actual taskmanager pod name
    kubectl exec -it ssb-session-admin-taskmanager-1-3 -n cld-streaming -- \
      curl -s http://localhost:9249/metrics | head -20
    

You should see flink_ metrics.


Sample Query 1: JVM CPU Load

flink_taskmanager_Status_JVM_CPU_Load{namespace="cld-streaming"}

Sample Query 2: Job Uptime

flink_jobmanager_job_uptime{namespace="cld-streaming"}

Sample Query 3: Records In/Out Per Second

sum(flink_taskmanager_job_task_operator_numRecordsInPerSecond{namespace="cld-streaming"}) by (job_name)

End-to-End Pipeline (NiFi → SSB → Kafka)

sum(rate(nifi_bytes_sent{namespace="cfm-streaming"}[5m])) 
or 
sum(flink_taskmanager_job_task_operator_numRecordsInPerSecond{namespace="cld-streaming"}) 
or 
sum(rate(kafka_server_brokertopicmetrics_bytesin_total{namespace="cld-streaming"}[5m]))

Visualizing in GrafanaPermalink

Import the Cloudera CSA Flink Dashboard

  1. Download the CSA Flink Dashboard JSON:
    curl -O https://raw.githubusercontent.com/cldr-steven-matison/ClouderaStreamingOperators/refs/heads/main/csa-flink-dashboard.json
  2. In Grafana → Dashboards  New  Import
  3. Click Upload JSON file and select the downloaded file.
  4. On the next screen:
    • Datasource → select your Prometheus data source
    • Click Import

Boom. You now have the new Cloudera CSA Flink Dashboard in Grafana:

Cloudera_CSA_Flink_Dashboard.png


SummaryPermalink

With this final piece in place, you have successfully built a complete, end-to-end observability pipeline across your entire Cloudera Streaming Operators architecture. By bridging CFM (NiFi) for ingestion, CSM (Kafka) for event streaming, and CSA (SQL Stream Builder / Flink) for real-time processing, you now have a unified view of your data’s lifecycle within a single Prometheus and Grafana stack.

In this specific guide we implemented a Headless Service and a ServiceMonitor to bypass the strict pod-spec limitations of Flink Native Kubernetes. This ensures that every dynamically provisioned JobManager and TaskManager is automatically discovered and scraped by Prometheus, completely eliminating the silent “0 targets” discovery failures during setup.

You can now reliably execute complex PromQL queries in Prometheus across namespaces and correlate behavior across entirely different engines. Whether you are tracking backpressure in NiFi, monitoring consumer lag in Kafka, or measuring checkpoint durations and records-per-second in Flink, you finally have the single pane of glass required to confidently debug, tune, scale, and monitor your streaming data pipelines.


 

End to End CSO Dashboard with GrafanaPermalink

Now that we have all of our operator based metrics flowing, all of the operator dashboards setup, and a good understanding of how Prometheus and Grafana queries work. We can easily build a new Fraud Dashboard with Grafana.

Download the CSO Fraud Detection Dashboard JSON and import it into Grafana.

 

Cloudera_CSO_Fraud_Dashboard.png

Summary: Observability in Kubernetes AchievedPermalink

By wiring CFM (NiFi), CSM (Kafka). CSA (Flink/SSB) metrics to Prometheus, you have successfully built the complete, end-to-end observability of the Cloudera Streaming Operators.

We didn’t just flip a switch to turn on metrics—we architected a robust, Kubernetes-native solution that respects strict SNI headers, leverages mTLS for secure API scraping, and utilizes headless services to bypass dynamic pod-spec limitations. Best of all, your entire monitoring configuration remains declarative and fully Git-trackable.

You can now reliably execute complex PromQL queries across namespaces, correlating behavior across entirely different engines. When you can overlay NiFi’s outbound byte rate directly on top of Kafka’s inbound throughput on the exact same Grafana dashboard, you no longer have to guess where a bottleneck resides. You have the telemetry to prove it.


ResourcesPermalink

Contributors