Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

JSON to Avro, Sub-records in Avro

avatar
Explorer

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).

 

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.

As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.

Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.

 

Thanks

 

The morphlines.conf

 

morphlines : [
   {   id : morphline1
   importCommands : ["org.kitesdk.**"]   commands : [
      # Read the JSON blob
      { readJson: {} }

      { logError { format : "record: {}", args : ["@{}"] } }

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              "/record_type[]/alert/action" : /alert/action,
              "/record_type[]/alert/signature_id" : /alert/signature_id,
              "/record_type[]/alert/signature" : /alert/signature,
              "/record_type[]/alert/category" : /alert/category,
              "/record_type[]/alert/severity" : /alert/severity
      } } }

      { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

      { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]

 

And the subrec.avsc

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

 

[{
    /record_type[]/alert/action = [allowed], 
    /record_type[]/alert/category = [],
    /record_type[]/alert/severity = [3],
    /record_type[]/alert/signature = [GeoIP from NL, Netherlands],
    /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]

 

Error

java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;

 

Also tried the following within ExtractJsonPaths:

 

/record_type[]/alert/action
/record_type/action

 

1 ACCEPTED SOLUTION

avatar
Super Collaborator

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

View solution in original post

15 REPLIES 15

avatar
Super Collaborator

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

avatar
Explorer
Thank you. I'll look into writing a custom command based on this script JsonToMap, http://stackoverflow.com/questions/21720759/convert-a-json-string-to-a-hashmap

avatar
Expert Contributor

I need to process nested JSON. How did you go about mapping the fields to the nested AVRO schema?

avatar
Contributor
I have the same question. It's still not clear to me whether morphlines natively supports toAvro with nested Records, of if we have to write a custom command...

avatar
New Contributor

  In fact, we can use jackson to solve this problem, and it is universal to any json data. 

 

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
      # read the JSON blob
      { readJson: {} }
	  
	  # java code
	  {
			  java { 
					imports : """
					  import com.fasterxml.jackson.databind.JsonNode;
					  import com.fasterxml.jackson.databind.ObjectMapper;
					  import org.kitesdk.morphline.base.Fields;
					  import java.io.IOException;
					  import java.util.Set;
					  import java.util.ArrayList;
					  import java.util.Iterator;
					  import java.util.List;
					  import java.util.Map;
					"""

					code : """
					  String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
					  ObjectMapper mapper = new ObjectMapper();
					  Map<String, Object> map = null;
					  try {
						  map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
					  } catch (IOException e) {
						  e.printStackTrace();
					  }
					  Set<String> keySet = map.keySet();
					  for (String o : keySet) {
						  record.put(o, map.get(o));
					  }
					  return child.process(record);                   
					"""
	 
			  }
	  }
      
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
      } }
      
      #{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
  
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
      } }
  
    ]
  }
]
  

avatar
Explorer
@whoschek any chance you can help me out here?