Created on 03-25-2015 08:35 AM - edited 09-16-2022 02:25 AM
I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).
Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.
As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.
Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.
Thanks
The morphlines.conf
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ # Read the JSON blob { readJson: {} } { logError { format : "record: {}", args : ["@{}"] } } # Extract JSON { extractJsonPaths { flatten: false, paths: { "/record_type[]/alert/action" : /alert/action, "/record_type[]/alert/signature_id" : /alert/signature_id, "/record_type[]/alert/signature" : /alert/signature, "/record_type[]/alert/category" : /alert/category, "/record_type[]/alert/severity" : /alert/severity } } } { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } { extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto, } } } # Create Avro according to schema { logError { format : "WE GO TO AVRO"} } { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } } # Create Avro container { logError { format : "WE GO TO BINARY"} } { writeAvroToByteArray { format: containerlessBinary } } { logError { format : "DONE!!!"} } ] } ]
And the subrec.avsc
{ "type" : "record", "name" : "Event", "fields" : [ { "name" : "timestamp", "type" : "string" }, { "name" : "event_type", "type" : "string" }, { "name" : "source_ip", "type" : "string" }, { "name" : "source_port", "type" : "int" }, { "name" : "destination_ip", "type" : "string" }, { "name" : "destination_port", "type" : "int" }, { "name" : "protocol", "type" : "string" }, { "name": "record_type", "type" : ["null", { "name" : "alert", "type" : "record", "fields" : [ { "name" : "action", "type" : "string" }, { "name" : "signature_id", "type" : "int" }, { "name" : "signature", "type" : "string" }, { "name" : "category", "type" : "string" }, { "name" : "severity", "type" : "int" } ] } ] } ] }
The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
[{ /record_type[]/alert/action = [allowed], /record_type[]/alert/category = [], /record_type[]/alert/severity = [3], /record_type[]/alert/signature = [GeoIP from NL, Netherlands], /record_type[]/alert/signature_id = [88006],
_attachment_body = [{ "timestamp": "2015-03-23T07:42:01.303046", "event_type": "alert", "src_ip": "1.1.1.1", "src_port": 18192, "dest_ip": "46.21.41.16", "dest_port": 62004, "proto": "TCP", "alert": { "action": "allowed", "gid": "1", "signature_id": "88006", "rev": "1", "signature" : "GeoIP from NL, Netherlands", "category" : "" "severity" : "3" } }], _attachment_mimetype=[json/java + memory],
basename = [simple_eve.json] }]
Error
java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object;
Also tried the following within ExtractJsonPaths:
/record_type[]/alert/action
/record_type/action
Created 03-30-2015 06:44 AM
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created 03-30-2015 06:44 AM
The toAvro command expects a java.util.Map as input on conversion to a nested Avro record, per
However, your input data contains a (nested) Jackson JSON object, not a java.util.Map. Hence the conversion can't succeed.
Consider writing a custom morphline command that implements whatever conversion rules you wish, per http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Cus...
Created 03-31-2015 12:00 AM
Created 04-20-2015 03:35 PM
I need to process nested JSON. How did you go about mapping the fields to the nested AVRO schema?
Created 07-09-2015 10:51 AM
Created 04-12-2018 06:33 PM
In fact, we can use jackson to solve this problem, and it is universal to any json data.
morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
# read the JSON blob
{ readJson: {} }
# java code
{
java {
imports : """
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.kitesdk.morphline.base.Fields;
import java.io.IOException;
import java.util.Set;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
"""
code : """
String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = null;
try {
map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
} catch (IOException e) {
e.printStackTrace();
}
Set<String> keySet = map.keySet();
for (String o : keySet) {
record.put(o, map.get(o));
}
return child.process(record);
"""
}
}
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
} }
#{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
]
Created 03-30-2015 06:05 AM