Created 01-16-2018 01:05 PM
Hello,
I'm trying to convert an array of Json to avro and I'm facing some difficulties. This is the Json I want to convert:
[{"MeasureId":"nifiHeartBeat", "Value":"1", "AuditDateTime":"Tue Jan 16 13:48:58 CET 2018", "DATA_SOURCE":"20083"}]
And here is the schema I'm declaring in the ConvertJSONToAvro processor (based on what I rode here😞
{ "type": "record", "name": "NifiHeartBeat_v3", "fields": [ { "name": "columns", "type": { "type": "array", "items": { "type": "record", "name": "columnValues", "fields": [ { "name": "MeasureId", "type": "string", "default": "null" }, { "name": "Value", "type": "string", "default": "null" }, { "name": "AuditDateTime", "type": "string", "default": "null" }, { "name": "DATA_SOURCE", "type": "string", "default": "null" } ] } } } ] }
This is the error I'm getting:
2018-01-16 14:00:25,780 WARN [Timer-Driven Process Thread-3] o.a.n.processors.kite.ConvertJSONToAvro ConvertJSONToAvro[id=b0a67367-f541-38d5-176f-c07d4a75dd24] Failed to convert 1/1 records from JSON to Avro
Note: If I get this to work I would move the schema to Hortonworks Schema-Registry, but I wanted to get it work in "local" first.
Could someone help me please?
Thanks in advance.
Created on 01-16-2018 05:43 PM - edited 08-18-2019 12:36 AM
Try with the below Record Schema property
ConvertJSONToAvro Configs:-
Record schema
{ "type": "record", "name": "NifiHeartBeat_v3", "fields": [{ "name": "MeasureId", "type": "string", "default": "null" }, { "name": "Value", "type": "string", "default": "null" }, { "name": "AuditDateTime", "type": "string", "default": "null" }, { "name": "DATA_SOURCE", "type": "string", "default": "null" }] }
And if you are testing with one json message then don’t enclose the whole json message in [](square brackets) because when we enclose the message in [] convertjsontoavro won’t parse the array message, we need to split the array if array having one message in it.
Try with json message
{ "MeasureId": "nifiHeartBeat", "Value": "1", "AuditDateTime": "Tue Jan 16 13:48:58 CET 2018", "DATA_SOURCE": "20083" }
ConvertJSONToAvro configs:-
(or)
If you need to enclose message with [] then use split json processor to split the message
[{ "MeasureId": "nifiHeartBeat", "Value": "1", "AuditDateTime": "Tue Jan 16 13:48:58 CET 2018", "DATA_SOURCE": "20083"}]
SplitJson Configs:-
Because ConvertJSONtoAVRO processor won't parse array of json messages so we need to split them individually(even array having one message in it) and send to the processor.
1.Add the property value as
JsonPath Expression
$.*
Configs:-
ConvertJSONToAvro configs:-
Same configs as mentioned above.
If you are using ConvertRecord processor then we don’t need to use SplitJSON processor as convert record processor works with array of json messages.
In addition for your reference i have attached nifi flow templates using convertjsontoavro processor and using convertrecord processor to convert json message to avro, so that you can save and reuse the templates.
json-to-avro-using-convertrecord.xml
json-to-avro-conversion-using-convertjsontoavro-pr.xml
.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created on 01-16-2018 05:43 PM - edited 08-18-2019 12:36 AM
Try with the below Record Schema property
ConvertJSONToAvro Configs:-
Record schema
{ "type": "record", "name": "NifiHeartBeat_v3", "fields": [{ "name": "MeasureId", "type": "string", "default": "null" }, { "name": "Value", "type": "string", "default": "null" }, { "name": "AuditDateTime", "type": "string", "default": "null" }, { "name": "DATA_SOURCE", "type": "string", "default": "null" }] }
And if you are testing with one json message then don’t enclose the whole json message in [](square brackets) because when we enclose the message in [] convertjsontoavro won’t parse the array message, we need to split the array if array having one message in it.
Try with json message
{ "MeasureId": "nifiHeartBeat", "Value": "1", "AuditDateTime": "Tue Jan 16 13:48:58 CET 2018", "DATA_SOURCE": "20083" }
ConvertJSONToAvro configs:-
(or)
If you need to enclose message with [] then use split json processor to split the message
[{ "MeasureId": "nifiHeartBeat", "Value": "1", "AuditDateTime": "Tue Jan 16 13:48:58 CET 2018", "DATA_SOURCE": "20083"}]
SplitJson Configs:-
Because ConvertJSONtoAVRO processor won't parse array of json messages so we need to split them individually(even array having one message in it) and send to the processor.
1.Add the property value as
JsonPath Expression
$.*
Configs:-
ConvertJSONToAvro configs:-
Same configs as mentioned above.
If you are using ConvertRecord processor then we don’t need to use SplitJSON processor as convert record processor works with array of json messages.
In addition for your reference i have attached nifi flow templates using convertjsontoavro processor and using convertrecord processor to convert json message to avro, so that you can save and reuse the templates.
json-to-avro-using-convertrecord.xml
json-to-avro-conversion-using-convertjsontoavro-pr.xml
.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created 01-19-2018 08:29 AM
Thank you so much for your answer and for taking time to prepare an environment to test my PoC. I really appreciate that. My problem was that I was trying to read the Json with "JsonPathReader" service. Changing to "JsonTreeReader" made the trick.
Anyway, I went forward with the PoC and now I'm sending the converted Avro data to a Kafka cluster and I'm trying to deserialize it after consuming. For all this, I'm using the Hortonworks schema-registry which works fine in Nifi but I can't get it to work in Java. I've tryied the example here but I'm getting an error at the time of declaring the deserializedObject.
Here you can see the code:
public void runAvroSerDesApis(byte[] serializedData, String name) throws IOException { this.serializedData = serializedData; this.name = name; //using builtin avro serializer/deserializer /*AvroSnapshotSerializer avroSnapshotSerializer = new AvroSnapshotSerializer(); avroSnapshotSerializer.init(config);*/ AvroSnapshotDeserializer deserializer = schemaRegistryClient.getDefaultDeserializer(AvroSchemaProvider.TYPE); deserializer.init(config); Object deviceObject = createGenericRecordForDevice(name); SchemaMetadata schemaMetadata = createSchemaMetadata("avro-serializer-schema-" + System.currentTimeMillis()); //byte[] serializedData = avroSnapshotSerializer.serialize(deviceObject, schemaMetadata); Object deserializedObj = deserializer.deserialize(new ByteArrayInputStream(serializedData), schemaMetadata, null); LOG.info("Serialized and deserialized objects are equal: [{}] ", deviceObject.equals(deserializedObj)); }
The line retrieving the error is "Object deserializedObj = deserializer.deserialize(new ByteArrayInputStream(serializedData), schemaMetadata, null);" and it says more or less that "actual and format argument lists differ in lenght"
Is there any example someone could provide about how to deserialize an Avro message in Java using Hortonworks Schema-Registry? I'm using v0.2.1.
Thanks in advance.