Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

JSON to Avro, Sub-records in Avro

avatar
Explorer

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).

 

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.

As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.

Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.

 

Thanks

 

The morphlines.conf

 

morphlines : [
   {   id : morphline1
   importCommands : ["org.kitesdk.**"]   commands : [
      # Read the JSON blob
      { readJson: {} }

      { logError { format : "record: {}", args : ["@{}"] } }

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              "/record_type[]/alert/action" : /alert/action,
              "/record_type[]/alert/signature_id" : /alert/signature_id,
              "/record_type[]/alert/signature" : /alert/signature,
              "/record_type[]/alert/category" : /alert/category,
              "/record_type[]/alert/severity" : /alert/severity
      } } }

      { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

      { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]

 

And the subrec.avsc

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

 

[{
    /record_type[]/alert/action = [allowed], 
    /record_type[]/alert/category = [],
    /record_type[]/alert/severity = [3],
    /record_type[]/alert/signature = [GeoIP from NL, Netherlands],
    /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]

 

Error

java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;

 

Also tried the following within ExtractJsonPaths:

 

/record_type[]/alert/action
/record_type/action

 

1 ACCEPTED SOLUTION

avatar
Super Collaborator

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per

 

https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/...

 

However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.

 

Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...

 

View solution in original post

15 REPLIES 15

avatar
Super Collaborator
> java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$FieldLjava/lang/Object;

This means you have a wrong avro jar file version on the classpath.

avatar
Explorer

I have 2 avro jars in my flume lib, avro-1.7.5.jar and avro-1.7.3.jar

 

Which one is the correct?

avatar
Super Collaborator
The more recent version.

avatar
Explorer

Thanks for your reply. I've removed the older version, restarted the agents. I believe the NoSuchMethod is gone. But still some problem with processing the record.

 

10453 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - EXTRACT
│ED THIS 2: [{/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[], /record_type[]/alert/severity=[3],
│ /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attac
│hment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","s
│rc_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allo
│wed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","categ
│ory":"","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json],
│destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], so
│urce_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}]
│10455 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - WE GO T
│O AVRO
│10457 [New I/O worker #1] WARN org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl - Morp
│hline /etc/flume/conf/conf.empty/morphlines.conf@morphline1 failed to process record: {_attachme
│nt_body=[[B@6d2fdb53], basename=[simple_eve.json]}

avatar
Explorer

I've got some more trace logs that can probably help out to fix `Cannot convert item: [] to schema`

 

 

25 Mar 2015 20:52:28,328 TRACE [New I/O  worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168)  - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}

25 Mar 2015 20:52:28,329 DEBUG [New I/O  worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156)  - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}

 

 

avatar
Super Collaborator
An empty morphline record field can't be converted to that avro schema, of course. Make sure your input data always matches the avro schema.

avatar
Explorer

How the record can be empty? The TRACE beforProcess shows a record with the information just before the toAvro is called.

 

Below the original json string that is given to the Avro Source in flume:

 

{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}

avatar
Super Collaborator
The record field is empty.

avatar
Explorer

Sorry, but I dont understand what you mean...

 

I've a feeling that something is wrong with extractJsonPaths. When I change the record output field names (left of colon) in the extractJsonPaths as shown below and use simple fields instead of complex nested records in the avro schema it works.

 

To test, I've changed this

 

 

      # Extract JSON
{ extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } }

Into this

 

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              action : /alert/action,
              signature_id : /alert/signature_id,
              signature : /alert/signature,
              category : /alert/category,
              severity : /alert/severity
      } } }

 

 And changed the avro schema (subrec.avsc) to this

 

 

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name" : "action",
    "type" : "string"
  }, {
    "name" : "signature_id",
    "type" : "int"
  }, {
    "name" : "signature",
    "type" : "string"
  }, {
    "name" : "category",
    "type" : "string"
  }, {
    "name" : "severity",
    "type" : "int"
  }]
}

 

 

How does the extractJsonPaths and avro schema file should look like to get nested records working?