Created on 03-25-2015 08:35 AM - edited 09-16-2022 02:25 AM
I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).
Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.
As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.
Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.
Thanks
The morphlines.conf
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ # Read the JSON blob { readJson: {} } { logError { format : "record: {}", args : ["@{}"] } } # Extract JSON { extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } } { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]
And the subrec.avsc
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name": "record_type", "type" : ["null", { "name" : "alert", "type" : "record", "fields" : [ { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" } ] } ] } ] }
The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
[{ /record_type[]/alert/action = [allowed], /record_type[]/alert/category = [], /record_type[]/alert/severity = [3], /record_type[]/alert/signature = [GeoIP from NL, Netherlands], /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]
Error
java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;
Also tried the following within ExtractJsonPaths:
/record_type[]/alert/action
/record_type/action
Created 03-30-2015 06:44 AM
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created 03-30-2015 06:44 AM
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created 03-31-2015 12:00 AM
Created 04-20-2015 03:35 PM
I need to process nested JSON. How did you go about mapping the fields to the nested AVRO schema?
Created 07-09-2015 10:51 AM
Created 04-12-2018 06:33 PM
In fact, we can use jackson to solve this problem, and it is universal to any json data.
morphlines: [ { id: convertJsonToAvro importCommands: [ "org.kitesdk.**" ] commands: [ # read the JSON blob { readJson: {} } # java code { java { imports : """ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.kitesdk.morphline.base.Fields; import java.io.IOException; import java.util.Set; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; """ code : """ String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString(); ObjectMapper mapper = new ObjectMapper(); Map<String, Object> map = null; try { map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class); } catch (IOException e) { e.printStackTrace(); } Set<String> keySet = map.keySet(); for (String o : keySet) { record.put(o, map.get(o)); } return child.process(record); """ } } # convert the extracted fields to an avro object # described by the schema in this field { toAvro { schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc } } #{ logInfo { format : "loginfo: {}", args : ["@{}"] } } # serialize the object as avro { writeAvroToByteArray: { format: containerlessBinary } } ] } ]
Created 03-30-2015 06:05 AM