Member since
12-27-2018
2
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2445 | 07-19-2019 05:44 AM |
07-19-2019
05:44 AM
The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content): @Grab('org.apache.avro:avro:1.8.2')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
//function which is traversing through all records (including nested ones)
def convertAvroNanosecToMillisec(record){
record.getSchema().getFields().forEach{ Schema.Field field ->
if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){
convertAvroNanosecToMillisec(record.get(field.name()))
}
if (field.schema().getType().getName() == "union"){
field.schema().getTypes().forEach{ Schema unionTypeSchema ->
if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
unionTypeSchema.addProp("logicalType", "timestamp-millis")
}
}
} else {
if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
field.schema().addProp("logicalType", "timestamp-millis")
}
}
}
return record
}
//start flowfile processing
def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inStream, outStream ->
// Defining avro reader and writer
DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
def contentSchema = reader.schema //source Avro schema
def records = [] //list will be used to temporary store the processed records
//reading all records from incoming file and adding to the temporary list
reader.forEach{ GenericRecord contentRecord ->
records.add(convertAvroNanosecToMillisec(contentRecord))
}
//creating a file writer object with adjusted schema
writer.create(contentSchema, outStream)
//adding records to the output file from the temporary list and closing the writer
records.forEach{ GenericRecord contentRecord ->
writer.append(contentRecord)
}
writer.close()
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Error appending new record to avro file', e)
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
}
... View more