Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi Avro Kafka message type long value (19 digits) cast to timestamp with milliseconds.

avatar
New Contributor

Good day. I`m now facing an issue converting Kafka`s message record of type long for nano-seconds (19 digits) to a string timestamp with milliseconds. The messages are coming in Avro format and contain different schemas (so we can`t statically define one schema) stored in Confluent Schema Registry. The current process is:

1) ConsumeKafkaRecord_2_0 which reads the message and stores the Avro schema coming from Confluent Schema Registry into avro.schema attribute

2) UpdateAttribute which is looking for a pattern of a timestamp record in the avro.schema and adds "logicalType":"timestamp-micros" (because i can`t find timestamp-nanos type in the Avro specification)

3) ConvertRecord which converts the Avro flowfile using avro.schema into JSON. It uses the logicalType assigned in the previous step and converts the 19 digits long into the yyyy-MM-dd HH:mm:SS.SSSSSS. Here the issue is that 19 digits is a nano-timestamp type which is missing in Avro specification, so we only can use timestamp-micros type and receive the year 51000+ values.

4) ReplaceText - this processor gives us a workaround for an issue described above and we are replacing the values of the 5-digits-year pattern with a "correct" datetime (with milliseconds, because Java somehow can`t work with microseconds) using and expression: ${'$1':toDate('yyyyy-MM-dd HH:mm:ss.SSSSSS'):toNumber():toString():substring(0, 13):toNumber():toDate():format('yyyy-MM-dd HH:mm:ss.SSS')}


After that we go on with other processors, the workaround works but with a strange issue - our resulting timestamps differ for a few milliseconds from what we receive in Kafka. I can only guess this is the result of the transformations described above. That`s why my question is - is there a better way to handle 19-digit values coming in the Avro messages (the schemas are in Confluent Schema Registry, the pattern for timestamp fields in schema is known) so that they are cast into correct millisecond timestamps? Maybe some kind of field value replacement (substring of 13 digits from 19-digit value) in Avro flowfile content based on its schema which is embedded/stored in avro.schema attribute?


Please let me know if something is unclear and if some additional details are needed. Thanks a lot in advance!

1 ACCEPTED SOLUTION

avatar
New Contributor

The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content):

@Grab('org.apache.avro:avro:1.8.2')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
//function which is traversing through all records (including nested ones)
def convertAvroNanosecToMillisec(record){
    record.getSchema().getFields().forEach{ Schema.Field field -> 
        if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){
            convertAvroNanosecToMillisec(record.get(field.name()))
        }
        
        if (field.schema().getType().getName() == "union"){
            field.schema().getTypes().forEach{ Schema unionTypeSchema ->
                if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                    record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                    unionTypeSchema.addProp("logicalType", "timestamp-millis")
                }
            }
        } else {
            if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                field.schema().addProp("logicalType", "timestamp-millis")
            }
        }
        
    } 
    return record
}
//start flowfile processing
def flowFile = session.get()
if(!flowFile) return
 
try {
 
flowFile = session.write(flowFile, {inStream, outStream ->
  // Defining avro reader and writer
  DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
  DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
 
  def contentSchema = reader.schema //source Avro schema
  def records = [] //list will be used to temporary store the processed records
  //reading all records from incoming file and adding to the temporary list
  reader.forEach{ GenericRecord contentRecord -> 
      records.add(convertAvroNanosecToMillisec(contentRecord))
  }
  //creating a file writer object with adjusted schema
  writer.create(contentSchema, outStream)
  //adding records to the output file from the temporary list and closing the writer
  records.forEach{ GenericRecord contentRecord -> 
      writer.append(contentRecord)
  }
  writer.close()
 
} as StreamCallback)
 
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error('Error appending new record to avro file', e)
   flowFile = session.penalize(flowFile)
   session.transfer(flowFile, REL_FAILURE)
}

View solution in original post

1 REPLY 1

avatar
New Contributor

The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content):

@Grab('org.apache.avro:avro:1.8.2')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
//function which is traversing through all records (including nested ones)
def convertAvroNanosecToMillisec(record){
    record.getSchema().getFields().forEach{ Schema.Field field -> 
        if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){
            convertAvroNanosecToMillisec(record.get(field.name()))
        }
        
        if (field.schema().getType().getName() == "union"){
            field.schema().getTypes().forEach{ Schema unionTypeSchema ->
                if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                    record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                    unionTypeSchema.addProp("logicalType", "timestamp-millis")
                }
            }
        } else {
            if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                field.schema().addProp("logicalType", "timestamp-millis")
            }
        }
        
    } 
    return record
}
//start flowfile processing
def flowFile = session.get()
if(!flowFile) return
 
try {
 
flowFile = session.write(flowFile, {inStream, outStream ->
  // Defining avro reader and writer
  DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
  DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
 
  def contentSchema = reader.schema //source Avro schema
  def records = [] //list will be used to temporary store the processed records
  //reading all records from incoming file and adding to the temporary list
  reader.forEach{ GenericRecord contentRecord -> 
      records.add(convertAvroNanosecToMillisec(contentRecord))
  }
  //creating a file writer object with adjusted schema
  writer.create(contentSchema, outStream)
  //adding records to the output file from the temporary list and closing the writer
  records.forEach{ GenericRecord contentRecord -> 
      writer.append(contentRecord)
  }
  writer.close()
 
} as StreamCallback)
 
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error('Error appending new record to avro file', e)
   flowFile = session.penalize(flowFile)
   session.transfer(flowFile, REL_FAILURE)
}