Created on 09-15-2016 11:28 AM - edited 08-17-2019 10:06 AM
In this article I will review the steps required to enrich and filter logs. It is assumed that the logs are landing one at a time as a stream into the nifi cluster. The steps involved Extract Attributes - IP and Action
Extract IP Address and Action - ExtractText Processor
This processor will evaluate each log and parse the information into attributes. To create a new attribute add a property and give it a name(soon to be attribute name) and a java-style regex command. As the processor runs it will evaluate the regex and create an attribute with the result. If there is no match it will be sent to the 'unmatched' result which is a simple way of filtering out different logs.
GeoEnrichIP - GeoEnrichIP Processor
This processor takes the ipaddr attribute generated in the previous step and compares it to a geo-database('mmdb'). I am using the GeoLite - City Database found here
Route on Threat - RouteOnAttribute Processor
This processor takes the IsDenied attribute from the previous step and tests to see if it is there. This will only exist if the "Extract IP Address" Processor found "iptables denied" in the log. It is then routed to a connectionw ith that property's name. More properties can be added with thier own rules following the nifi expression language Note I plan on adding location filtering but did not want to obscure the demo in too many steps.
Cold and Medium Storage - Processor Groups
These two processor groups are very similar in function. Eventually they could be combined into one shared group using attributes for rules but for now they are separate.
High Threat - Processor Group
In order to be read by a hive external table we need to convert the data to a JSON format and save it to the correct directory.
Hive Table Query
Using the ambari hive view I am able to now query my logs and use sql-style queries to get results
CREATE TABLE `securitylogs`( |
`ctime` varchar(255) COMMENT 'from deserializer', |
`country` varchar(255) COMMENT 'from deserializer', |
`city` varchar(255) COMMENT 'from deserializer', |
`ipaddr` varchar(255) COMMENT 'from deserializer', |
`fullbody` varchar(5000) COMMENT 'from deserializer') |
ROW FORMAT SERDE |
'org.apache.hive.hcatalog.data.JsonSerDe' |
STORED AS INPUTFORMAT |
'org.apache.hadoop.mapred.TextInputFormat' |
OUTPUTFORMAT |
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
LOCATION |
'hdfs://sandbox.hortonworks.com:8020/user/nifi/High_Threat' |