Support Questions

Find answers, ask questions, and share your expertise

NIfi: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule Error

avatar
Contributor

Hi Team, I am implementing the CSFLE(Client Side Field Level Encryption) in the Nifi for the Kafka data. I have written Groovy script to get the logic like Consume/deserialize/Encrypt the data. The script is working as standalone. However, if i place it in ExecuteScript processor ,i am getting Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule Error. It seems it is compatibility issue in the kafka-client jar file. However, i could not find the fix. Please provide your input on this.

My groovy script is below.  

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import org.apache.kafka.common.security.plain.PlainLoginModule
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.LoginContext


Thread.currentThread().setContextClassLoader(null);
def kafkaConsumer = null
try {
    // Kafka consumer properties
    def props = [
        'bootstrap.servers'                    : 'server:port',
        'auto.register.schemas'                : 'false',
        'basic.auth.credentials.source'        : 'USER_INFO',
		'key.serializer'                       : 'org.apache.kafka.common.serialization.StringSerializer',
        'value.serializer'                     : 'io.confluent.kafka.serializers.KafkaAvroSerializer',
        'key.deserializer'                     : 'org.apache.kafka.common.serialization.StringDeserializer',
        'value.deserializer'                   : 'io.confluent.kafka.serializers.KafkaAvroDeserializer',
        'security.protocol'                    : 'SASL_SSL',
        'schema.registry.url'                  : 'Sampe-urlcloud',
        'basic.auth.user.info'                 : 'unamee:pass',
        'sasl.mechanism'                       : 'PLAIN',
        'sasl.jaas.config'                     : 'org.apache.kafka.common.security.plain.PlainLoginModule required username="uname" password="pass";',
        'use.latest.version'                   : 'false',
        'client.id'                            : 'csfle-simple-producer',
        'auto.offset.reset'                    : 'earliest',
        'group.id'                             : 'nifi-csfle-group6',
        'rule.executors._default_.param.access.key.id' :'keyid',
        'rule.executors._default_.param.secret.access.key' : 'accesskey'
    ]
    log.info("RAJIV-1: 'csfle_test_kek_oct_29'")
    // Create KafkaConsumer instance
    kafkaConsumer = new KafkaConsumer<String, Object>(props)
	log.info("RAJIV-2: 'csfle_test_kek_oct_29'")
    kafkaConsumer.subscribe(['csfle_test_kek_oct_29'])
	log.info("RAJIV-3: 'csfle_test_kek_oct_29'")

    // Poll for messages
    ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofSeconds(10))

    if (records.isEmpty()) {
        log.info("No records found in the topic.")
    } else {
        // Process each record
        records.each { ConsumerRecord<String, Object> record ->
            try {
                // Log the record details
                log.info("Received record: Topic=${record.topic()}, Partition=${record.partition()}, Offset=${record.offset()}, Key=${record.key()}, Value=${record.value()}")
            } catch (Exception e) {
                log.error("Error processing record: ${e.getMessage()}")
            }
        }
    }
} catch (Exception e) {
   log.error("Kafka Consumer Error: ${e.message}", e)

} finally {
    // Close the Kafka consumer
    if (kafkaConsumer != null) {
        kafkaConsumer.close()
    }
}

 

 

 

 

 

1 ACCEPTED SOLUTION

avatar
Super Guru

Hi @rajivswe_2k7 ,

Is it possible that you are not specifying the path In "Module Directory"  property of the ExecuteScript Processor where all required dependencies can be located?

Also my understanding that the script in such processor is evaluated on each flowfile so if  you are running complex code that requires a lot of external dependencies this could be costly. A cleaner and more efficient way is to create custom processor where all the code and dependencies gets packaged as NAR file, or if you are using version 2.0 you can create custom processor using python extensions if the same thing can be implemented in python.

Hope that helps.

 

 

View solution in original post

2 REPLIES 2

avatar
Super Guru

Hi @rajivswe_2k7 ,

Is it possible that you are not specifying the path In "Module Directory"  property of the ExecuteScript Processor where all required dependencies can be located?

Also my understanding that the script in such processor is evaluated on each flowfile so if  you are running complex code that requires a lot of external dependencies this could be costly. A cleaner and more efficient way is to create custom processor where all the code and dependencies gets packaged as NAR file, or if you are using version 2.0 you can create custom processor using python extensions if the same thing can be implemented in python.

Hope that helps.

 

 

avatar
Contributor

Thank you @SAMSAL . This is the issue, if i specify the external "Module Directory"  in the executescript , the ClassLoader somewhat clashed and no required class get loaded. So i have removed that and placed all my Jars into nifi/lib directory alone. Now it is working fine. Please let me know if any better way to handle that. Just not to use the nifi/lib directory.

Also, let me try creating the custom processors to achieve this CSFLE.