Reply
Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

@whoschek any chance you can help me out here?
Cloudera Employee
Posts: 146
Registered: ‎08-21-2013

Re: JSON to Avro, Sub-records in Avro

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

Thank you. I'll look into writing a custom command based on this script JsonToMap, http://stackoverflow.com/questions/21720759/convert-a-json-string-to-a-hashmap
Expert Contributor
Posts: 139
Registered: ‎07-21-2014

Re: JSON to Avro, Sub-records in Avro

I need to process nested JSON. How did you go about mapping the fields to the nested AVRO schema?

Explorer
Posts: 11
Registered: ‎06-05-2015

Re: JSON to Avro, Sub-records in Avro

I have the same question. It's still not clear to me whether morphlines natively supports toAvro with nested Records, of if we have to write a custom command...
Tom
New Contributor
Posts: 2
Registered: ‎04-04-2018

Re: JSON to Avro, Sub-records in Avro

  In fact, we can use jackson to solve this problem, and it is universal to any json data. 

 

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
      # read the JSON blob
      { readJson: {} }
	  
	  # java code
	  {
			  java { 
					imports : """
					  import com.fasterxml.jackson.databind.JsonNode;
					  import com.fasterxml.jackson.databind.ObjectMapper;
					  import org.kitesdk.morphline.base.Fields;
					  import java.io.IOException;
					  import java.util.Set;
					  import java.util.ArrayList;
					  import java.util.Iterator;
					  import java.util.List;
					  import java.util.Map;
					"""

					code : """
					  String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
					  ObjectMapper mapper = new ObjectMapper();
					  Map<String, Object> map = null;
					  try {
						  map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
					  } catch (IOException e) {
						  e.printStackTrace();
					  }
					  Set<String> keySet = map.keySet();
					  for (String o : keySet) {
						  record.put(o, map.get(o));
					  }
					  return child.process(record);                   
					"""
	 
			  }
	  }
      
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
      } }
      
      #{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
  
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
      } }
  
    ]
  }
]
  
Announcements
The Kite SDK is a collection of docs, sample code, APIs, and tools to make Hadoop application development faster. Learn more at http://kitesdk.org.