Member since
03-25-2015
13
Posts
0
Kudos Received
0
Solutions
08-01-2017
12:00 PM
@Wynner
This will work, thank you! I've linked all TailFile inputs to a single UpdateAttribute. Using the Advanced option I've created per TailFile a rule. For TailFileSuricata: Conditions ${tailfile.original.path:find('/var/log/suricata/.*')}
Action
Attribute: log.source Value: suricata
... View more
07-28-2017
11:44 AM
In our case we have 5 different TailFile processors as inputs and we use 5 UpdateAttribute processors to tag the flowfile comming off the TailFile processors. But this results in a crazy looking data flow with an UpdateAttribute processor for each TailFile processor. So my idea was, to link each TailFile processors to one and the same UpdateAttribute processor, with each a different relationship name. Within the UpdateAttribute I wanted to grep the relationship name and put this as an attrbute on the flow file. But the question is, how to get the relationship name? If this is not possible is there a more simple and manageble way to tag flow files based on the input processor (TailFile)? Thank you.
... View more
Labels:
- Labels:
-
Apache NiFi
03-31-2015
12:00 AM
Thank you. I'll look into writing a custom command based on this script JsonToMap, http://stackoverflow.com/questions/21720759/convert-a-json-string-to-a-hashmap
... View more
03-25-2015
01:35 PM
Sorry, but I dont understand what you mean... I've a feeling that something is wrong with extractJsonPaths. When I change the record output field names (left of colon) in the extractJsonPaths as shown below and use simple fields instead of complex nested records in the avro schema it works. To test, I've changed this # Extract JSON { extractJsonPaths { flatten: false, paths: {
"/record_type[]/alert/action" : /alert/action,
"/record_type[]/alert/signature_id" : /alert/signature_id,
"/record_type[]/alert/signature" : /alert/signature,
"/record_type[]/alert/category" : /alert/category,
"/record_type[]/alert/severity" : /alert/severity
} } } Into this # Extract JSON
{ extractJsonPaths { flatten: false, paths: {
action : /alert/action,
signature_id : /alert/signature_id,
signature : /alert/signature,
category : /alert/category,
severity : /alert/severity
} } } And changed the avro schema (subrec.avsc) to this {
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "timestamp",
"type" : "string"
}, {
"name" : "event_type",
"type" : "string"
}, {
"name" : "source_ip",
"type" : "string"
}, {
"name" : "source_port",
"type" : "int"
}, {
"name" : "destination_ip",
"type" : "string"
}, {
"name" : "destination_port",
"type" : "int"
}, {
"name" : "protocol",
"type" : "string"
}, {
"name" : "action",
"type" : "string"
}, {
"name" : "signature_id",
"type" : "int"
}, {
"name" : "signature",
"type" : "string"
}, {
"name" : "category",
"type" : "string"
}, {
"name" : "severity",
"type" : "int"
}]
} How does the extractJsonPaths and avro schema file should look like to get nested records working?
... View more
03-25-2015
01:24 PM
How the record can be empty? The TRACE beforProcess shows a record with the information just before the toAvro is called. Below the original json string that is given to the Avro Source in flume: {"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}
... View more
03-25-2015
12:55 PM
I've got some more trace logs that can probably help out to fix `Cannot convert item: [] to schema` 25 Mar 2015 20:52:28,328 TRACE [New I/O worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168) - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}
25 Mar 2015 20:52:28,329 DEBUG [New I/O worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156) - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}
... View more
03-25-2015
11:20 AM
Thanks for your reply. I've removed the older version, restarted the agents. I believe the NoSuchMethod is gone. But still some problem with processing the record. 10453 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - EXTRACT │ED THIS 2: [{/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[], /record_type[]/alert/severity=[3], │ /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attac │hment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","s │rc_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allo │wed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","categ │ory":"","severity":"3"}}], _attachment_mimetype=[json/java+memory], basename=[simple_eve.json], │destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], so │urce_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}] │10455 [New I/O worker #1] ERROR org.kitesdk.morphline.stdlib.LogErrorBuilder$LogError - WE GO T │O AVRO │10457 [New I/O worker #1] WARN org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl - Morp │hline /etc/flume/conf/conf.empty/morphlines.conf@morphline1 failed to process record: {_attachme │nt_body=[[B@6d2fdb53], basename=[simple_eve.json]}
... View more
03-25-2015
10:24 AM
I have 2 avro jars in my flume lib, avro-1.7.5.jar and avro-1.7.3.jar Which one is the correct?
... View more
03-25-2015
08:35 AM
I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure). Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord. As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed. Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function. Thanks The morphlines.conf morphlines : [
{ id : morphline1
importCommands : ["org.kitesdk.**"] commands : [
# Read the JSON blob
{ readJson: {} }
{ logError { format : "record: {}", args : ["@{}"] } }
# Extract JSON
{ extractJsonPaths { flatten: false, paths: {
"/record_type[]/alert/action" : /alert/action,
"/record_type[]/alert/signature_id" : /alert/signature_id,
"/record_type[]/alert/signature" : /alert/signature,
"/record_type[]/alert/category" : /alert/category,
"/record_type[]/alert/severity" : /alert/severity
} } }
{ logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
{ extractJsonPaths { flatten: false, paths: { timestamp : /timestamp, event_type : /event_type, source_ip : /src_ip, source_port : /src_port, destination_ip : /dest_ip, destination_port : /dest_port, protocol : /proto,
} } }
# Create Avro according to schema
{ logError { format : "WE GO TO AVRO"} }
{ toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } }
# Create Avro container
{ logError { format : "WE GO TO BINARY"} }
{ writeAvroToByteArray { format: containerlessBinary } }
{ logError { format : "DONE!!!"} }
]
}
] And the subrec.avsc {
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "timestamp",
"type" : "string"
}, {
"name" : "event_type",
"type" : "string"
}, {
"name" : "source_ip",
"type" : "string"
}, {
"name" : "source_port",
"type" : "int"
}, {
"name" : "destination_ip",
"type" : "string"
}, {
"name" : "destination_port",
"type" : "int"
}, {
"name" : "protocol",
"type" : "string"
}, {
"name": "record_type",
"type" : ["null", {
"name" : "alert",
"type" : "record",
"fields" : [ {
"name" : "action",
"type" : "string"
}, {
"name" : "signature_id",
"type" : "int"
}, {
"name" : "signature",
"type" : "string"
}, {
"name" : "category",
"type" : "string"
}, {
"name" : "severity",
"type" : "int"
}
] } ]
} ]
} The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } [{
/record_type[]/alert/action = [allowed],
/record_type[]/alert/category = [],
/record_type[]/alert/severity = [3],
/record_type[]/alert/signature = [GeoIP from NL, Netherlands],
/record_type[]/alert/signature_id = [88006], _attachment_body = [{
"timestamp": "2015-03-23T07:42:01.303046",
"event_type": "alert",
"src_ip": "1.1.1.1",
"src_port": 18192,
"dest_ip": "46.21.41.16",
"dest_port": 62004,
"proto": "TCP",
"alert": {
"action": "allowed",
"gid": "1",
"signature_id": "88006",
"rev": "1",
"signature" : "GeoIP from NL, Netherlands",
"category" : ""
"severity" : "3"
}
}],
_attachment_mimetype=[json/java + memory], basename = [simple_eve.json]
}] Error java.lang.NoSuchMethodError: org.apache.avro.reflect.ReflectData.getDefaultValue(Lorg/apache/avro/Schema$Field;)Ljava/lang/Object; Also tried the following within ExtractJsonPaths: /record_type[]/alert/action /record_type/action
... View more
Labels:
- Labels:
-
Apache Flume