Created on 03-25-2015 08:35 AM - edited 09-16-2022 02:25 AM
I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).
Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.
As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.
Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.
Thanks
The morphlines.conf
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ # Read the JSON blob { readJson: {} } { logError { format : "record: {}", args : ["@{}"] } } # Extract JSON { extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } } { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]
And the subrec.avsc
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name": "record_type", "type" : ["null", { "name" : "alert", "type" : "record", "fields" : [ { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" } ] } ] } ] }
The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
[{ /record_type[]/alert/action = [allowed], /record_type[]/alert/category = [], /record_type[]/alert/severity = [3], /record_type[]/alert/signature = [GeoIP from NL, Netherlands], /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]
Error
java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;
Also tried the following within ExtractJsonPaths:
/record_type[]/alert/action
/record_type/action
Created 03-30-2015 06:44 AM
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created 03-25-2015 09:41 AM
Created 03-25-2015 10:24 AM
I have 2 avro jars in my flume lib, avro-1.7.5.jar and avro-1.7.3.jar
Which one is the correct?
Created 03-25-2015 11:05 AM
Created on 03-25-2015 11:20 AM - edited 03-25-2015 11:28 AM
Thanks for your reply. I've removed the older version, restarted the agents. I believe the NoSuchMethod is gone. But still some problem with processing the record.
10453 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - EXTRACT
│ED THIS 2: [{/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[], /record_type[]/alert/severity=[3],
│ /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attac
│hment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","s
│rc_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allo
│wed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","categ
│ory":"","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json],
│destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], so
│urce_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}]
│10455 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - WE GO T
│O AVRO
│10457 [New I/O worker #1] WARN org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl - Morp
│hline /etc/flume/conf/conf.empty/morphlines.conf@morphline1 failed to process record: {_attachme
│nt_body=[[B@6d2fdb53], basename=[simple_eve.json]}
Created 03-25-2015 12:55 PM
I've got some more trace logs that can probably help out to fix `Cannot convert item: [] to schema`
25 Mar 2015 20:52:28,328 TRACE [New I/O worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168) - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]} 25 Mar 2015 20:52:28,329 DEBUG [New I/O worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156) - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}
Created 03-25-2015 01:16 PM
Created 03-25-2015 01:24 PM
How the record can be empty? The TRACE beforProcess shows a record with the information just before the toAvro is called.
Below the original json string that is given to the Avro Source in flume:
{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}
Created 03-25-2015 01:29 PM
Created on 03-25-2015 01:35 PM - edited 03-25-2015 03:33 PM
Sorry, but I dont understand what you mean...
I've a feeling that something is wrong with extractJsonPaths. When I change the record output field names (left of colon) in the extractJsonPaths as shown below and use simple fields instead of complex nested records in the avro schema it works.
To test, I've changed this
# Extract JSON
{ extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } }
Into this
# Extract JSON { extractJsonPaths { flatten: false, paths: { action : /alert/action, signature_id : /alert/signature_id, signature : /alert/signature, category : /alert/category, severity : /alert/severity } } }
And changed the avro schema (subrec.avsc) to this
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" }] }
How does the extractJsonPaths and avro schema file should look like to get nested records working?