Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

JSON to Avro, Sub-records in Avro

avatar
Explorer

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).

 

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.

As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.

Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.

 

Thanks

 

The morphlines.conf

 

morphlines : [
   {   id : morphline1
   importCommands : ["org.kitesdk.**"]   commands : [
      # Read the JSON blob
      { readJson: {} }

      { logError { format : "record: {}", args : ["@{}"] } }

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              "/record_type[]/alert/action" : /alert/action,
              "/record_type[]/alert/signature_id" : /alert/signature_id,
              "/record_type[]/alert/signature" : /alert/signature,
              "/record_type[]/alert/category" : /alert/category,
              "/record_type[]/alert/severity" : /alert/severity
      } } }

      { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

      { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]

 

And the subrec.avsc

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

 

[{
    /record_type[]/alert/action = [allowed], 
    /record_type[]/alert/category = [],
    /record_type[]/alert/severity = [3],
    /record_type[]/alert/signature = [GeoIP from NL, Netherlands],
    /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]

 

Error

java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;

 

Also tried the following within ExtractJsonPaths:

 

/record_type[]/alert/action
/record_type/action

 

1 ACCEPTED SOLUTION

avatar
Super Collaborator

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

View solution in original post

15 REPLIES 15

avatar
Super Collaborator

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

avatar
Explorer
Thank you. I'll look into writing a custom command based on this script JsonToMap, http://stackoverflow.com/questions/21720759/convert-a-json-string-to-a-hashmap

avatar
Expert Contributor

I need to process nested JSON. How did you go about mapping the fields to the nested AVRO schema?

avatar
Contributor
I have the same question. It's still not clear to me whether morphlines natively supports toAvro with nested Records, of if we have to write a custom command...

avatar
New Contributor

  In fact, we can use jackson to solve this problem, and it is universal to any json data. 

 

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
      # read the JSON blob
      { readJson: {} }
	  
	  # java code
	  {
			  java { 
					imports : """
					  import com.fasterxml.jackson.databind.JsonNode;
					  import com.fasterxml.jackson.databind.ObjectMapper;
					  import org.kitesdk.morphline.base.Fields;
					  import java.io.IOException;
					  import java.util.Set;
					  import java.util.ArrayList;
					  import java.util.Iterator;
					  import java.util.List;
					  import java.util.Map;
					"""

					code : """
					  String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
					  ObjectMapper mapper = new ObjectMapper();
					  Map<String, Object> map = null;
					  try {
						  map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
					  } catch (IOException e) {
						  e.printStackTrace();
					  }
					  Set<String> keySet = map.keySet();
					  for (String o : keySet) {
						  record.put(o, map.get(o));
					  }
					  return child.process(record);                   
					"""
	 
			  }
	  }
      
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
      } }
      
      #{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
  
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
      } }
  
    ]
  }
]
  

avatar
Explorer
@whoschek any chance you can help me out here?