Reply
Highlighted
Explorer
Posts: 8
Registered: ‎03-25-2015
Accepted Solution

JSON to Avro, Sub-records in Avro

[ Edited ]

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).

 

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.

As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.

Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.

 

Thanks

 

The morphlines.conf

 

morphlines : [
   {   id : morphline1
   importCommands : ["org.kitesdk.**"]   commands : [
      # Read the JSON blob
      { readJson: {} }

      { logError { format : "record: {}", args : ["@{}"] } }

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              "/record_type[]/alert/action" : /alert/action,
              "/record_type[]/alert/signature_id" : /alert/signature_id,
              "/record_type[]/alert/signature" : /alert/signature,
              "/record_type[]/alert/category" : /alert/category,
              "/record_type[]/alert/severity" : /alert/severity
      } } }

      { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

      { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]

 

And the subrec.avsc

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

 

[{
    /record_type[]/alert/action = [allowed], 
    /record_type[]/alert/category = [],
    /record_type[]/alert/severity = [3],
    /record_type[]/alert/signature = [GeoIP from NL, Netherlands],
    /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]

 

Error

java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;

 

Also tried the following within ExtractJsonPaths:

 

/record_type[]/alert/action
/record_type/action

 

Cloudera Employee
Posts: 145
Registered: ‎08-21-2013

Re: JSON to Avro, Sub-records in Avro

> java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$FieldLjava/lang/Object;

This means you have a wrong avro jar file version on the classpath.

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

I have 2 avro jars in my flume lib, avro-1.7.5.jar and avro-1.7.3.jar

 

Which one is the correct?

Cloudera Employee
Posts: 145
Registered: ‎08-21-2013

Re: JSON to Avro, Sub-records in Avro

The more recent version.

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

[ Edited ]

Thanks for your reply. I've removed the older version, restarted the agents. I believe the NoSuchMethod is gone. But still some problem with processing the record.

 

10453 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - EXTRACT
│ED THIS 2: [{/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[], /record_type[]/alert/severity=[3],
│ /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attac
│hment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","s
│rc_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allo
│wed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","categ
│ory":"","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json],
│destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], so
│urce_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}]
│10455 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - WE GO T
│O AVRO
│10457 [New I/O worker #1] WARN org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl - Morp
│hline /etc/flume/conf/conf.empty/morphlines.conf@morphline1 failed to process record: {_attachme
│nt_body=[[B@6d2fdb53], basename=[simple_eve.json]}

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

I've got some more trace logs that can probably help out to fix `Cannot convert item: [] to schema`

 

 

25 Mar 2015 20:52:28,328 TRACE [New I/O  worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168)  - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}

25 Mar 2015 20:52:28,329 DEBUG [New I/O  worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156)  - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}

 

 

Cloudera Employee
Posts: 145
Registered: ‎08-21-2013

Re: JSON to Avro, Sub-records in Avro

An empty morphline record field can't be converted to that avro schema, of course. Make sure your input data always matches the avro schema.

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

How the record can be empty? The TRACE beforProcess shows a record with the information just before the toAvro is called.

 

Below the original json string that is given to the Avro Source in flume:

 

{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}
Cloudera Employee
Posts: 145
Registered: ‎08-21-2013

Re: JSON to Avro, Sub-records in Avro

The record field is empty.

Explorer
Posts: 8
Registered: ‎03-25-2015

Re: JSON to Avro, Sub-records in Avro

[ Edited ]

Sorry, but I dont understand what you mean...

 

I've a feeling that something is wrong with extractJsonPaths. When I change the record output field names (left of colon) in the extractJsonPaths as shown below and use simple fields instead of complex nested records in the avro schema it works.

 

To test, I've changed this

 

 

      # Extract JSON
{ extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } }

Into this

 

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              action : /alert/action,
              signature_id : /alert/signature_id,
              signature : /alert/signature,
              category : /alert/category,
              severity : /alert/severity
      } } }

 

 And changed the avro schema (subrec.avsc) to this

 

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name" : "action",
    "type" : "string"
  }, {
    "name" : "signature_id",
    "type" : "int"
  }, {
    "name" : "signature",
    "type" : "string"
  }, {
    "name" : "category",
    "type" : "string"
  }, {
    "name" : "severity",
    "type" : "int"
  }]
}

 

 

How does the extractJsonPaths and avro schema file should look like to get nested records working?

 

Announcements
The Kite SDK is a collection of docs, sample code, APIs, and tools to make Hadoop application development faster. Learn more at http://kitesdk.org.