Created 07-15-2019 01:20 PM
Good day. I`m now facing an issue converting Kafka`s message record of type long for nano-seconds (19 digits) to a string timestamp with milliseconds. The messages are coming in Avro format and contain different schemas (so we can`t statically define one schema) stored in Confluent Schema Registry. The current process is:
1) ConsumeKafkaRecord_2_0 which reads the message and stores the Avro schema coming from Confluent Schema Registry into avro.schema attribute
2) UpdateAttribute which is looking for a pattern of a timestamp record in the avro.schema and adds "logicalType":"timestamp-micros" (because i can`t find timestamp-nanos type in the Avro specification)
3) ConvertRecord which converts the Avro flowfile using avro.schema into JSON. It uses the logicalType assigned in the previous step and converts the 19 digits long into the yyyy-MM-dd HH:mm:SS.SSSSSS. Here the issue is that 19 digits is a nano-timestamp type which is missing in Avro specification, so we only can use timestamp-micros type and receive the year 51000+ values.
4) ReplaceText - this processor gives us a workaround for an issue described above and we are replacing the values of the 5-digits-year pattern with a "correct" datetime (with milliseconds, because Java somehow can`t work with microseconds) using and expression: ${'$1':toDate('yyyyy-MM-dd HH:mm:ss.SSSSSS'):toNumber():toString():substring(0, 13):toNumber():toDate():format('yyyy-MM-dd HH:mm:ss.SSS')}
After that we go on with other processors, the workaround works but with a strange issue - our resulting timestamps differ for a few milliseconds from what we receive in Kafka. I can only guess this is the result of the transformations described above. That`s why my question is - is there a better way to handle 19-digit values coming in the Avro messages (the schemas are in Confluent Schema Registry, the pattern for timestamp fields in schema is known) so that they are cast into correct millisecond timestamps? Maybe some kind of field value replacement (substring of 13 digits from 19-digit value) in Avro flowfile content based on its schema which is embedded/stored in avro.schema attribute?
Please let me know if something is unclear and if some additional details are needed. Thanks a lot in advance!
Created 07-19-2019 05:44 AM
The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content):
@Grab('org.apache.avro:avro:1.8.2') import org.apache.avro.* import org.apache.avro.file.* import org.apache.avro.generic.* //function which is traversing through all records (including nested ones) def convertAvroNanosecToMillisec(record){ record.getSchema().getFields().forEach{ Schema.Field field -> if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){ convertAvroNanosecToMillisec(record.get(field.name())) } if (field.schema().getType().getName() == "union"){ field.schema().getTypes().forEach{ Schema unionTypeSchema -> if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){ record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13))) unionTypeSchema.addProp("logicalType", "timestamp-millis") } } } else { if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){ record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13))) field.schema().addProp("logicalType", "timestamp-millis") } } } return record } //start flowfile processing def flowFile = session.get() if(!flowFile) return try { flowFile = session.write(flowFile, {inStream, outStream -> // Defining avro reader and writer DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>()) DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()) def contentSchema = reader.schema //source Avro schema def records = [] //list will be used to temporary store the processed records //reading all records from incoming file and adding to the temporary list reader.forEach{ GenericRecord contentRecord -> records.add(convertAvroNanosecToMillisec(contentRecord)) } //creating a file writer object with adjusted schema writer.create(contentSchema, outStream) //adding records to the output file from the temporary list and closing the writer records.forEach{ GenericRecord contentRecord -> writer.append(contentRecord) } writer.close() } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Error appending new record to avro file', e) flowFile = session.penalize(flowFile) session.transfer(flowFile, REL_FAILURE) }
Created 07-19-2019 05:44 AM
The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content):
@Grab('org.apache.avro:avro:1.8.2') import org.apache.avro.* import org.apache.avro.file.* import org.apache.avro.generic.* //function which is traversing through all records (including nested ones) def convertAvroNanosecToMillisec(record){ record.getSchema().getFields().forEach{ Schema.Field field -> if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){ convertAvroNanosecToMillisec(record.get(field.name())) } if (field.schema().getType().getName() == "union"){ field.schema().getTypes().forEach{ Schema unionTypeSchema -> if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){ record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13))) unionTypeSchema.addProp("logicalType", "timestamp-millis") } } } else { if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){ record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13))) field.schema().addProp("logicalType", "timestamp-millis") } } } return record } //start flowfile processing def flowFile = session.get() if(!flowFile) return try { flowFile = session.write(flowFile, {inStream, outStream -> // Defining avro reader and writer DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>()) DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()) def contentSchema = reader.schema //source Avro schema def records = [] //list will be used to temporary store the processed records //reading all records from incoming file and adding to the temporary list reader.forEach{ GenericRecord contentRecord -> records.add(convertAvroNanosecToMillisec(contentRecord)) } //creating a file writer object with adjusted schema writer.create(contentSchema, outStream) //adding records to the output file from the temporary list and closing the writer records.forEach{ GenericRecord contentRecord -> writer.append(contentRecord) } writer.close() } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Error appending new record to avro file', e) flowFile = session.penalize(flowFile) session.transfer(flowFile, REL_FAILURE) }