Member since
09-03-2020
128
Posts
7
Kudos Received
0
Solutions
02-04-2024
09:32 AM
There is no Kafka property that you can add to track the timestamp of the message. Each record in a topic has a creation timestamp property, this property can be populated by the producer (by default) or by the broker, depending on how the topic is configured (https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html) At the producer code level, you can add headers to the record to store a timestamp, then compare this time at the consumer level, here are some Java code samples that you might use: Producer code level: // Create producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //Save the current timstamp long time = System.currentTimeMillis(); //Create header list List<Header> headers = new ArrayList<>(); headers.add(new RecordHeader("producer_timestamp", Longs.toByteArray(time))); //produce the record ProducerRecord<String,String> producerRecord = new ProducerRecord<>(topic,null, null,"message key","Message value", headers); Note that the 3rd argument to produce the Record is the timestamp, in this case, we are sending null to allow the Kafka producer API to add the timestamp in the record. With this code, we captured one custom timestamp as a header, before sending the message and the producer timestamp when the record was produced On the consumer side, we need to read both timestamps: ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records){ log.info("Key: " + record.key() + ", value: " + record.value()); long producerTimestamp = 0L; Headers consumedHeaders = record.headers(); for (Header header : consumedHeaders) { log.info("Header Key" + header.key()); log.info("Header Value String" + new String(header.value())); if (header.key().equals("producer_timestamp")) { for (byte b : header.value()) { // Shifting previous value 8 bits to right and // add it with next value producerTimestamp = (producerTimestamp << 8) + (b & 255); } log.info("Record timestamp: " + record.timestamp() + ", Producer timestamp: " + producerTimestamp + ", Consumer timestamp: " + System.currentTimeMillis()); } } We need to transform the header from bytes[] to long and then print the content: log.info("Record timestamp: " + record.timestamp() + ", Producer timestamp: " + producerTimestamp + ", Consumer timestamp: " + System.currentTimeMillis()); We can see 3 different timestamps in the output, the record timestamp, which was added by the Kafka producer API, the producer timestamp, stored by the producer as a header before sending the message, and the consumer timestamp captured at the consumer side after reading the record. Let us know if this example helps too answer your questions
... View more
01-15-2024
07:44 AM
Could you please get the full processor error in the nifi-app.log, that will provide more details to understand the exception?
... View more
12-05-2023
04:26 AM
The message means that the authentication is valid, but the user is not authorized to use the topic. Can you confirm what authorizer are you using and the current oauth configuration?
... View more
09-14-2023
08:23 AM
You are authenticated as: cn=Mohit Kumar,ou=FM-Users,ou=Managed services,dc=CORP,dc=SA,dc=ZAIN,dc=COM' But your policies are for: CN=Mohit Kumar,OU=FM-Users,OU=Managed services,DC=CORP,DC=SA,DC=ZAIN,DC=COM You can try to set the initial admin as the user that is authenticated (note the case sensitive): cn=Mohit Kumar,ou=FM-Users,ou=Managed services,dc=CORP,dc=SA,dc=ZAIN,dc=COM Stop Nifi, and delete users.xml and authorizations.xml files. Start Nifi and share the results by trying to log in again.
... View more
09-14-2023
08:17 AM
What processors are you using? List or Get files?
... View more
07-14-2023
12:05 PM
The nifi-user.log is showing the user "myuser", which belongs to the groups "bigG, bigdGer", does not have access to the /flow resource. You can check on the Ranger audit section, for the resource that is denied, then give access to the groups or the username to this resource.
... View more
06-24-2022
01:44 PM
Can you check in nifi-users.xml the authorization error? This will show us the principal which is trying to connect. It should be the owner of the certificate store into keystore.jks. Make sure that matches with the principals that are created into users.xml
... View more
06-24-2022
01:39 PM
If you plan upgrade to Cloudera Flow Management 2.1.4, which includes NiFi 1.16, you can follow the migration Paths that are listed in the next document: https://docs.cloudera.com/cfm/2.1.4/upgrade-paths/topics/cfm-upgrade-paths.html
... View more
06-08-2022
11:29 AM
3 Kudos
You need to create a DBCPConnectionPool service and set the required connection properties. On DataBase driver location you can enter the folder where the jar files are located, this folder must be accesible by nifi service user and must be present in all of the NiFI nodes. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-dbcp-service-nar/1.16.0/org.apache.nifi.dbcp.DBCPConnectionPool/index.html Then you can use a SQL processor, such as PutSQL to insert data into the data base: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.16.0/org.apache.nifi.processors.standard.PutSQL/
... View more
06-08-2022
11:21 AM
Can you share a snapshot of nifi-user.log file captured when you get 403 error?
... View more