- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
JSON to Avro, Sub-records in Avro
- Labels:
-
Apache Flume
Created on ‎03-25-2015 08:35 AM - edited ‎09-16-2022 02:25 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).
Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.
As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.
Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.
Thanks
The morphlines.conf
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ # Read the JSON blob { readJson: {} } { logError { format : "record: {}", args : ["@{}"] } } # Extract JSON { extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } } { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]
And the subrec.avsc
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name": "record_type", "type" : ["null", { "name" : "alert", "type" : "record", "fields" : [ { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" } ] } ] } ] }
The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
[{ /record_type[]/alert/action = [allowed], /record_type[]/alert/category = [], /record_type[]/alert/severity = [3], /record_type[]/alert/signature = [GeoIP from NL, Netherlands], /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]
Error
java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;
Also tried the following within ExtractJsonPaths:
/record_type[]/alert/action
/record_type/action
Created ‎03-30-2015 06:44 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created ‎03-25-2015 09:41 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This means you have a wrong avro jar file version on the classpath.
Created ‎03-25-2015 10:24 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have 2 avro jars in my flume lib, avro-1.7.5.jar and avro-1.7.3.jar
Which one is the correct?
Created ‎03-25-2015 11:05 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created on ‎03-25-2015 11:20 AM - edited ‎03-25-2015 11:28 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks for your reply. I've removed the older version, restarted the agents. I believe the NoSuchMethod is gone. But still some problem with processing the record.
10453 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - EXTRACT
│ED THIS 2: [{/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[], /record_type[]/alert/severity=[3],
│ /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attac
│hment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","s
│rc_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allo
│wed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","categ
│ory":"","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json],
│destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], so
│urce_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}]
│10455 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - WE GO T
│O AVRO
│10457 [New I/O worker #1] WARN org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl - Morp
│hline /etc/flume/conf/conf.empty/morphlines.conf@morphline1 failed to process record: {_attachme
│nt_body=[[B@6d2fdb53], basename=[simple_eve.json]}
Created ‎03-25-2015 12:55 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I've got some more trace logs that can probably help out to fix `Cannot convert item: [] to schema`
25 Mar 2015 20:52:28,328 TRACE [New I/O worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168) - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]} 25 Mar 2015 20:52:28,329 DEBUG [New I/O worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156) - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}
Created ‎03-25-2015 01:16 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created ‎03-25-2015 01:24 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
How the record can be empty? The TRACE beforProcess shows a record with the information just before the toAvro is called.
Below the original json string that is given to the Avro Source in flume:
{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}
Created ‎03-25-2015 01:29 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created on ‎03-25-2015 01:35 PM - edited ‎03-25-2015 03:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Sorry, but I dont understand what you mean...
I've a feeling that something is wrong with extractJsonPaths. When I change the record output field names (left of colon) in the extractJsonPaths as shown below and use simple fields instead of complex nested records in the avro schema it works.
To test, I've changed this
# Extract JSON
{ extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } }
Into this
# Extract JSON { extractJsonPaths { flatten: false, paths: { action : /alert/action, signature_id : /alert/signature_id, signature : /alert/signature, category : /alert/category, severity : /alert/severity } } }
And changed the avro schema (subrec.avsc) to this
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" }] }
How does the extractJsonPaths and avro schema file should look like to get nested records working?
