Member since
06-17-2019
17
Posts
3
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1219 | 11-27-2024 09:58 PM |
04-02-2025
01:44 AM
It seems after i have added necessary permission, i am able to access the context parameter via invoke HTTP
... View more
03-27-2025
03:04 AM
Thank you @MattWho . I have seen the user logs and i am seeing the Authedication is success for all the event. Still getting the error. 2025-03-27 09:55:58,285 INFO [NiFi Web Server-261648] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=e58d4bb03c8809e4505dabe6319387cf, dt. span_id=653728c3fb282dfb] Authentication Started my-ip [<rgand645><CN=my-nifi, O=Internal Intern ational Inc., L=Bethesda, ST=Maryland, C=US>] GET https://my-nifi:8443/nifi-api/flow/controller/bulletins 2025-03-27 09:55:58,285 INFO [NiFi Web Server-261648] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=e58d4bb03c8809e4505dabe6319387cf, dt. span_id=653728c3fb282dfb] Authentication Success [rgand645] my-ip GET https://my-nifi:8443/nifi-api/flow/ controller/bulletins 2025-03-27 09:55:58,287 INFO [NiFi Web Server-278014] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=32098e8d185d12d4fc5bf343e6f48820, dt. span_id=bd1992a5aba5bcd1] Authentication Started my-ip [<rgand645><CN=my-nifi, O=Internal Intern ational Inc., L=Bethesda, ST=Maryland, C=US>] GET https://my-nifi:8443/nifi-api/flow/process-groups/a9b83e0e-0195 -1000-0000-0000734310f5 2025-03-27 09:55:58,287 INFO [NiFi Web Server-278014] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=32098e8d185d12d4fc5bf343e6f48820, dt. span_id=bd1992a5aba5bcd1] Authentication Success [rgand645] my-ip GET https://my-nifi:8443/nifi-api/flow/ process-groups/a9b83e0e-0195-1000-0000-0000734310f5 2025-03-27 09:55:58,290 INFO [NiFi Web Server-265066] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=5b7b196ee1f35065a8f2e3adf8b1b1eb, dt. span_id=73570aa3b370865b] Authentication Started my-ip [<rgand645><CN=my-nifi, O=Internal Intern ational Inc., L=Bethesda, ST=Maryland, C=US>] GET https://my-nifi:8443/nifi-api/flow/status 2025-03-27 09:55:58,290 INFO [NiFi Web Server-265066] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=5b7b196ee1f35065a8f2e3adf8b1b1eb, dt. span_id=73570aa3b370865b] Authentication Success [rgand645] my-ip GET https://my-nifi:8443/nifi-api/flow/ status 2025-03-27 09:55:58,290 INFO [NiFi Web Server-261648] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=d19eaae1f387574329dd230274ce1112, dt. span_id=9e3f0301601331a0] Authentication Started my-ip [<rgand645><CN=my-nifi, O=Internal Intern ational Inc., L=Bethesda, ST=Maryland, C=US>] GET https://my-nifi:8443/nifi-api/flow/current-user 2025-03-27 09:55:58,290 INFO [NiFi Web Server-261648] o.a.n.w.s.NiFiAuthenticationFilter [!dt dt.trace_sampled=true, dt.trace_id=d19eaae1f387574329dd230274ce1112, dt. span_id=9e3f0301601331a0] Authentication Success [rgand645] my-ip GET https://my-nifi:8443/nifi-api/flow/ current-user
... View more
03-20-2025
05:54 AM
Hi Team, I am trying to develop a flow to update the Parameter context on the fly using the InvokeHTTP Processor. Based on the screenshot i have configured the same. However i am getting unAuthorized Error. My URL format is like : https://<nifi-host>:<port>/nifi-api/parameter-contexts/<context-id> Error Screenshot.
... View more
Labels:
- Labels:
-
Apache NiFi
12-17-2024
02:27 AM
1 Kudo
Thank you @SAMSAL . This is the issue, if i specify the external "Module Directory" in the executescript , the ClassLoader somewhat clashed and no required class get loaded. So i have removed that and placed all my Jars into nifi/lib directory alone. Now it is working fine. Please let me know if any better way to handle that. Just not to use the nifi/lib directory. Also, let me try creating the custom processors to achieve this CSFLE.
... View more
12-15-2024
11:05 PM
Hi Team, I am implementing the CSFLE(Client Side Field Level Encryption) in the Nifi for the Kafka data. I have written Groovy script to get the logic like Consume/deserialize/Encrypt the data. The script is working as standalone. However, if i place it in ExecuteScript processor ,i am getting Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule Error. It seems it is compatibility issue in the kafka-client jar file. However, i could not find the fix. Please provide your input on this. My groovy script is below. import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import org.apache.kafka.common.security.plain.PlainLoginModule
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.LoginContext
Thread.currentThread().setContextClassLoader(null);
def kafkaConsumer = null
try {
// Kafka consumer properties
def props = [
'bootstrap.servers' : 'server:port',
'auto.register.schemas' : 'false',
'basic.auth.credentials.source' : 'USER_INFO',
'key.serializer' : 'org.apache.kafka.common.serialization.StringSerializer',
'value.serializer' : 'io.confluent.kafka.serializers.KafkaAvroSerializer',
'key.deserializer' : 'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer' : 'io.confluent.kafka.serializers.KafkaAvroDeserializer',
'security.protocol' : 'SASL_SSL',
'schema.registry.url' : 'Sampe-urlcloud',
'basic.auth.user.info' : 'unamee:pass',
'sasl.mechanism' : 'PLAIN',
'sasl.jaas.config' : 'org.apache.kafka.common.security.plain.PlainLoginModule required username="uname" password="pass";',
'use.latest.version' : 'false',
'client.id' : 'csfle-simple-producer',
'auto.offset.reset' : 'earliest',
'group.id' : 'nifi-csfle-group6',
'rule.executors._default_.param.access.key.id' :'keyid',
'rule.executors._default_.param.secret.access.key' : 'accesskey'
]
log.info("RAJIV-1: 'csfle_test_kek_oct_29'")
// Create KafkaConsumer instance
kafkaConsumer = new KafkaConsumer<String, Object>(props)
log.info("RAJIV-2: 'csfle_test_kek_oct_29'")
kafkaConsumer.subscribe(['csfle_test_kek_oct_29'])
log.info("RAJIV-3: 'csfle_test_kek_oct_29'")
// Poll for messages
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofSeconds(10))
if (records.isEmpty()) {
log.info("No records found in the topic.")
} else {
// Process each record
records.each { ConsumerRecord<String, Object> record ->
try {
// Log the record details
log.info("Received record: Topic=${record.topic()}, Partition=${record.partition()}, Offset=${record.offset()}, Key=${record.key()}, Value=${record.value()}")
} catch (Exception e) {
log.error("Error processing record: ${e.getMessage()}")
}
}
}
} catch (Exception e) {
log.error("Kafka Consumer Error: ${e.message}", e)
} finally {
// Close the Kafka consumer
if (kafkaConsumer != null) {
kafkaConsumer.close()
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
12-09-2024
12:29 AM
My Groovy script: import javax.crypto.Cipher import javax.crypto.spec.SecretKeySpec import java.util.Base64 // Define the AES encryption key String encryptionKey = "6ef552ae5333c9abb682e1f5221b1bdc" // Ensure the key is 16 bytes (128 bits) for AES-128 if (encryptionKey.length() != 32) { throw new IllegalArgumentException("Encryption key must be 16 characters long for AES-128.") } // Convert the key to a SecretKeySpec SecretKeySpec aesKey = new SecretKeySpec(encryptionKey.getBytes("UTF-8"), "AES") // Initialize the Cipher for AES/ECB/PKCS5Padding Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding") cipher.init(Cipher.ENCRYPT_MODE, aesKey) // Retrieve the input data to encrypt String inputData = record.getValue("name") // Replace "name" with your actual field name byte[] encryptedData = cipher.doFinal(inputData.getBytes("UTF-8")) // Encode the encrypted data to Base64 String encryptedBase64 = Base64.getEncoder().encodeToString(encryptedData) // Set the encrypted value back into the record record.setValue("name", encryptedBase64) // Return the updated record return record
... View more
12-09-2024
12:28 AM
Hi Team, I am trying to implement the CSFLE logic using the AWS KMS service. I am using ScriptiedTransformRecord processor with Groovy script. For testing purpose , i have used the sample encryption key which created by openssl and tested the script , it is working fine. However, i would like to use the AWS KMS for the encryption keys. Is the a way to interact with KMS to get the key . Here is my flow
... View more
Labels:
- Labels:
-
Apache NiFi
11-27-2024
09:58 PM
Thank you @MattWho . Yes , Initially, i have designed the partial failure. Now , i have changed the design to captured on the failure flow files and send the alert on that. Thank you.
... View more
11-21-2024
07:59 AM
Thank you @MattWho , Lists3->fetchs3->compress the files(4 or 5 files usually)->put target s3->Fetchs3 again->Puts3 Archive folder->DeleteS3(Remove Original file)->(Check all 5 files are processed till delete, if any one file is missing Route all 5 to Failure else Route all 5 to success) , for this last step , i have used Execute script, now i am looking for Native processors. Kindly let me know if you need more information on this.
... View more
11-21-2024
01:58 AM
1 Kudo
Hi Team, I am trying to implement for the scenario where the events should sleep for 5 minutes and let all the flow files are queued up, then check the count of the files if it is greater than 10 route to failure else route to SUCCESS. I did in Execute script processor as below. However, i am trying to avoid the executescript for this and try to use Native nifi processors. ====== Executescript =============== Thread.sleep(300000) def flowFiles = session.get(100) if (!flowFiles || flowFiles.size() <= 10) { session.transfer(flowFiles, REL_SUCCESS) } else { session.transfer(flowFiles, REL_FAILURE) }
... View more
Labels:
- Labels:
-
Apache NiFi