Support Questions

Find answers, ask questions, and share your expertise

NiFi - Support SASL/OAUTHBEARER in Kafka processors

avatar
Expert Contributor

I need to authenticate to a Kafka Broker using OAuth. 

I am looking at the Apache NiFi issues pages and see this URL to add support for SASL/OAUTHBEARER to the Kafka 3 processors which now have an allowable values list of SASL mechanisms - https://issues.apache.org/jira/browse/NIFI-7421

Does anyone know if this will ever be implemented?

Otherwise, does anyone have any list of ideas I could use to achieve this requirement?

@MattWho 
@SAMSAL 

1 REPLY 1

avatar
Master Mentor

@drewski7 
I have just picked your ticket I hope I can help you resolve this issue if its still unresolved. There are are couple of configurations changes and implementations that have to done.

1. Overview

OAuth allows Kafka clients to obtain access tokens from an external authentication provider like OAuth providers to authenticate with the Kafka broker.
This process involves configuring the Kafka broker, OAuth provider, and Kafka clients.

2. Prerequisites

  • Kafka cluster with SASL/OAUTHBEARER enabled.
  • An OAuth provider set up to issue access tokens.
  • Kafka clients that support SASL/OAUTHBEARER.
  • Required libraries for OAuth integration (e.g. kafka-clients, oauth2-client, or keycloak adapters).

3. Procedure

Step 1: Configure the OAuth Provider

  1. Set up an OAuth provider (e.g., Keycloak, Okta, etc.) to act as the identity provider (IdP).

  2. Register a new client application for Kafka in the OAuth provider:

    • Set up client ID and client secret for Kafka clients.
    • Configure scopes, roles, or claims required for authorization.
    • Enable grant types like Client Credentials or Password (depending on your use case).
  3. Note down the following details:

    • Authorization Server URL (e.g.https://authlogin.northwind.com/token).
  4. Client ID and Client Secret.

Step 2: Configure the Kafka Broker

  1. Enable SASL/OAUTHBEARER Authentication:

    • Edit the Kafka broker configuration (/config/server.properties)

Spoiler
sasl.enabled.mechanisms=OAUTHBEARER
listener.name.<listener-name>.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="https://auth.example.com/token" \
oauth.client.id="kafka-broker-client-id" \
oauth.client.secret="kafka-broker-client-secret" \
oauth.scope="kafka-scope";

Replace <listener-name> with (SASL_PLAINTEXT, SASL_SSL) as  appropriate.

  • Configure ACLs (Optional):

    • If using authorization, configure ACLs to grant specific permissions to authenticated users.
  • Restart the Kafka Broker:

    • Restart the Kafka broker to apply the changes
      Spoiler
      sudo systemctl restart kafka

Step 3: Configure the Kafka Client

  1. Add required dependencies to your Kafka client application:

    • For Java applications, add the Kafka and OAuth dependencies to your pom.xml or build.gradle.

pom.xml example

Spoiler
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.nimbusds</groupId>
<artifactId>oauth2-oidc-sdk</artifactId>
<version>9.4</version>
</dependency>

2. Configure OAuth in the Kafka Client:

  • Specify the SASL mechanism and the OAuth token endpoint in the client configuration
    Spoiler
    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,broker2:9092");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "OAUTHBEARER");
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
    "oauth.token.endpoint.uri=\"https://auth.example.com/token\" " +
    "oauth.client.id=\"kafka-client-id\" " +
    "oauth.client.secret=\"kafka-client-secret\";");

    3. Implement Token Retrieval (Optional):

    • Use an external tool or library to retrieve and manage tokens if you need a custom implementation.
    Spoiler
    curl -X POST -d "grant_type=client_credentials&client_id=kafka-client-id&client_secret=kafka-client-secret" \
    https://auth.example.com/token

    4. Create the Kafka Producer/Consumer:

    Use the above configuration to initialize a Kafka producer or consumer
    Spoiler
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    Step 4: Test the Authentication

    1. Produce and consume messages to verify OAuth-based authentication:

    Spoiler
    kafka-console-producer.sh --broker-list <broker-address> --topic <topic-name> --producer.config <client-config>
    kafka-console-consumer.sh --bootstrap-server <broker-address> --topic <topic-name> --consumer.config <client-config>
    Ensure logs indicate successful authentication using SASL/OAUTHBEARER.

    Step 5: Monitor and Debug

    • Check Kafka broker logs for errors related to OAuth authentication.
    • Verify token expiration and renewal mechanisms.
    • Ensure the OAuth provider is reachable from the Kafka brokers and clients.

    Happy Hadooping I hope the above steps helps in the diagnosis and resolution of you Kafka OAuth issue