Created on 12-15-2024 11:05 PM - edited 12-15-2024 11:11 PM
Hi Team, I am implementing the CSFLE(Client Side Field Level Encryption) in the Nifi for the Kafka data. I have written Groovy script to get the logic like Consume/deserialize/Encrypt the data. The script is working as standalone. However, if i place it in ExecuteScript processor ,i am getting Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule Error. It seems it is compatibility issue in the kafka-client jar file. However, i could not find the fix. Please provide your input on this.
My groovy script is below.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import org.apache.kafka.common.security.plain.PlainLoginModule
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.LoginContext
Thread.currentThread().setContextClassLoader(null);
def kafkaConsumer = null
try {
// Kafka consumer properties
def props = [
'bootstrap.servers' : 'server:port',
'auto.register.schemas' : 'false',
'basic.auth.credentials.source' : 'USER_INFO',
'key.serializer' : 'org.apache.kafka.common.serialization.StringSerializer',
'value.serializer' : 'io.confluent.kafka.serializers.KafkaAvroSerializer',
'key.deserializer' : 'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer' : 'io.confluent.kafka.serializers.KafkaAvroDeserializer',
'security.protocol' : 'SASL_SSL',
'schema.registry.url' : 'Sampe-urlcloud',
'basic.auth.user.info' : 'unamee:pass',
'sasl.mechanism' : 'PLAIN',
'sasl.jaas.config' : 'org.apache.kafka.common.security.plain.PlainLoginModule required username="uname" password="pass";',
'use.latest.version' : 'false',
'client.id' : 'csfle-simple-producer',
'auto.offset.reset' : 'earliest',
'group.id' : 'nifi-csfle-group6',
'rule.executors._default_.param.access.key.id' :'keyid',
'rule.executors._default_.param.secret.access.key' : 'accesskey'
]
log.info("RAJIV-1: 'csfle_test_kek_oct_29'")
// Create KafkaConsumer instance
kafkaConsumer = new KafkaConsumer<String, Object>(props)
log.info("RAJIV-2: 'csfle_test_kek_oct_29'")
kafkaConsumer.subscribe(['csfle_test_kek_oct_29'])
log.info("RAJIV-3: 'csfle_test_kek_oct_29'")
// Poll for messages
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofSeconds(10))
if (records.isEmpty()) {
log.info("No records found in the topic.")
} else {
// Process each record
records.each { ConsumerRecord<String, Object> record ->
try {
// Log the record details
log.info("Received record: Topic=${record.topic()}, Partition=${record.partition()}, Offset=${record.offset()}, Key=${record.key()}, Value=${record.value()}")
} catch (Exception e) {
log.error("Error processing record: ${e.getMessage()}")
}
}
}
} catch (Exception e) {
log.error("Kafka Consumer Error: ${e.message}", e)
} finally {
// Close the Kafka consumer
if (kafkaConsumer != null) {
kafkaConsumer.close()
}
}
Created 12-16-2024 04:38 AM
Hi @rajivswe_2k7 ,
Is it possible that you are not specifying the path In "Module Directory" property of the ExecuteScript Processor where all required dependencies can be located?
Also my understanding that the script in such processor is evaluated on each flowfile so if you are running complex code that requires a lot of external dependencies this could be costly. A cleaner and more efficient way is to create custom processor where all the code and dependencies gets packaged as NAR file, or if you are using version 2.0 you can create custom processor using python extensions if the same thing can be implemented in python.
Hope that helps.
Created 12-16-2024 04:38 AM
Hi @rajivswe_2k7 ,
Is it possible that you are not specifying the path In "Module Directory" property of the ExecuteScript Processor where all required dependencies can be located?
Also my understanding that the script in such processor is evaluated on each flowfile so if you are running complex code that requires a lot of external dependencies this could be costly. A cleaner and more efficient way is to create custom processor where all the code and dependencies gets packaged as NAR file, or if you are using version 2.0 you can create custom processor using python extensions if the same thing can be implemented in python.
Hope that helps.
Created 12-17-2024 02:27 AM
Thank you @SAMSAL . This is the issue, if i specify the external "Module Directory" in the executescript , the ClassLoader somewhat clashed and no required class get loaded. So i have removed that and placed all my Jars into nifi/lib directory alone. Now it is working fine. Please let me know if any better way to handle that. Just not to use the nifi/lib directory.
Also, let me try creating the custom processors to achieve this CSFLE.