Community Articles

Find and share helpful community-sourced technical articles.
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
Expert Contributor

In this article I will review the steps required to enrich and filter logs. It is assumed that the logs are landing one at a time as a stream into the nifi cluster. The steps involved Extract Attributes - IP and Action

  • Extract Attributes - IP and Action
  • Cold Store non ip logs
  • GeoEnrich the IP address
  • Cold store local IP addresses
  • Route the remaining logs based on threat level
  • Store the low threat logs in HDFS
  • Place high threat logs into an external table


Extract IP Address and Action - ExtractText Processor

This processor will evaluate each log and parse the information into attributes. To create a new attribute add a property and give it a name(soon to be attribute name) and a java-style regex command. As the processor runs it will evaluate the regex and create an attribute with the result. If there is no match it will be sent to the 'unmatched' result which is a simple way of filtering out different logs.


GeoEnrichIP - GeoEnrichIP Processor

This processor takes the ipaddr attribute generated in the previous step and compares it to a geo-database('mmdb'). I am using the GeoLite - City Database found here


Route on Threat - RouteOnAttribute Processor

This processor takes the IsDenied attribute from the previous step and tests to see if it is there. This will only exist if the "Extract IP Address" Processor found "iptables denied" in the log. It is then routed to a connectionw ith that property's name. More properties can be added with thier own rules following the nifi expression language Note I plan on adding location filtering but did not want to obscure the demo in too many steps.


Cold and Medium Storage - Processor Groups

These two processor groups are very similar in function. Eventually they could be combined into one shared group using attributes for rules but for now they are separate.

  • Merge Content - This processor takes each individual line and combines them into a larger aggregated file. This helps avoid the too many small files problem that arises in large clusters
  • Compress Content - Simply saves disk space by compressing them
  • Set Filename As Timestamp - UpdateAttribute Processor - This takes each aggragate and sets the attribute 'filename' to the current time. This will allow us to sort the aggregates by when they were written for later review
  • PutHDFS Processor - Takes the aggregate and saves it to HDFS


High Threat - Processor Group

In order to be read by a hive external table we need to convert the data to a JSON format and save it to the correct directory.

  • Rename Attributes - UpdateAttribute Processor - This renames the fields to match the hive field format
  • Put Into JSON - AttributesToJSON - Takes the renamed fields and saves them in a JSON string that the hive SerDe can read natively
  • Set Filename As Timestamp - UpdateAttribute Processor - Once again this sets the filename to the timestamp. This may be better served as systemname + timestamp moving forward
  • PutHDFS - Stores the data to the hive external file location


Hive Table Query

Using the ambari hive view I am able to now query my logs and use sql-style queries to get results

CREATE TABLE `securitylogs`(
`ctime` varchar(255) COMMENT 'from deserializer',
`country` varchar(255) COMMENT 'from deserializer',
`city` varchar(255) COMMENT 'from deserializer',
`ipaddr` varchar(255) COMMENT 'from deserializer',
`fullbody` varchar(5000) COMMENT 'from deserializer')