Member since
03-25-2015
13
Posts
0
Kudos Received
0
Solutions
04-12-2018
06:33 PM
In fact, we can use jackson to solve this problem, and it is universal to any json data. morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
# read the JSON blob
{ readJson: {} }
# java code
{
java {
imports : """
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.kitesdk.morphline.base.Fields;
import java.io.IOException;
import java.util.Set;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
"""
code : """
String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = null;
try {
map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
} catch (IOException e) {
e.printStackTrace();
}
Set<String> keySet = map.keySet();
for (String o : keySet) {
record.put(o, map.get(o));
}
return child.process(record);
"""
}
}
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
} }
#{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
]
... View more
08-01-2017
12:00 PM
@Wynner
This will work, thank you! I've linked all TailFile inputs to a single UpdateAttribute. Using the Advanced option I've created per TailFile a rule. For TailFileSuricata: Conditions ${tailfile.original.path:find('/var/log/suricata/.*')}
Action
Attribute: log.source Value: suricata
... View more