Created on 09-15-201611:28 AM - edited 08-17-201910:06 AM
In this article I will review the steps required to enrich and filter logs. It is assumed that the logs are landing one at a time as a stream into the nifi cluster. The steps involved
Extract Attributes - IP and Action
Extract Attributes - IP and Action
Cold Store non ip logs
GeoEnrich the IP address
Cold store local IP addresses
Route the remaining logs based on threat level
Store the low threat logs in HDFS
Place high threat logs into an external table
Extract IP Address and Action - ExtractText Processor
This processor will evaluate each log and parse the information into attributes. To create a new attribute add a property and give it a name(soon to be attribute name) and a java-style regex command. As the processor runs it will evaluate the regex and create an attribute with the result.
If there is no match it will be sent to the 'unmatched' result which is a simple way of filtering out different logs.
GeoEnrichIP - GeoEnrichIP Processor
This processor takes the ipaddr attribute generated in the previous step and compares it to a geo-database('mmdb'). I am using the GeoLite - City Database found here
Route on Threat - RouteOnAttribute Processor
This processor takes the IsDenied attribute from the previous step and tests to see if it is there. This will only exist if the "Extract IP Address" Processor found "iptables denied" in the log. It is then routed to a connectionw ith that property's name. More properties can be added with thier own rules following the nifi expression language
Note I plan on adding location filtering but did not want to obscure the demo in too many steps.
Cold and Medium Storage - Processor Groups
These two processor groups are very similar in function. Eventually they could be combined into one shared group using attributes for rules but for now they are separate.
Merge Content - This processor takes each individual line and combines them into a larger aggregated file. This helps avoid the too many small files problem that arises in large clusters
Compress Content - Simply saves disk space by compressing them
Set Filename As Timestamp - UpdateAttribute Processor - This takes each aggragate and sets the attribute 'filename' to the current time. This will allow us to sort the aggregates by when they were written for later review
PutHDFS Processor - Takes the aggregate and saves it to HDFS
High Threat - Processor Group
In order to be read by a hive external table we need to convert the data to a JSON format and save it to the correct directory.
Rename Attributes - UpdateAttribute Processor - This renames the fields to match the hive field format
Put Into JSON - AttributesToJSON - Takes the renamed fields and saves them in a JSON string that the hive SerDe can read natively
Set Filename As Timestamp - UpdateAttribute Processor - Once again this sets the filename to the timestamp. This may be better served as systemname + timestamp moving forward
PutHDFS - Stores the data to the hive external file location
Hive Table Query
Using the ambari hive view I am able to now query my logs and use sql-style queries to get results