Created on 08-12-2016 11:03 PM - edited 08-17-2019 10:51 AM
This article is not meant to show how to install or create a “Hello World” Nifi data flow, but how to resolve a data filtering problem with NiFi providing two approaches, using a filter list as a file on the disk, which could be static or dynamic, and a list stored in a distributed cache populated from the same file. The amount of data used was minimal and simplistic and no performance difference can be perceived, however, at scale, where memory is available, a caching implementation should perform better.
This article assumes some familiarity with NiFi, knowing what a processor or a queue is and how to set the basic configurations for a processor or a queue, also how to visualize the data at various steps throughout the flow, starting and stopping processors.
Since you are somehow familiar with Nifi, you probably know how to install it and start it, however, I will provide a quick refresher below.
For this demo, I used the latest version of Nifi available at the date of working on this demo, 0.6.1. This version was not part of the HDP 2.4.2 which was available at the time of this demo, it has also 0.5.1. HDP 2.5 was just launched last month at the Hadoop Summit in Santa Clara.
If you wanted for your OSX installation to use brew install nifi that will only install nifi 0.5.1 which does not have some of the features needed for the demo, e.g. PutDistributedCacheMap or FetchDistributedCacheMap.
Instead, use the following steps:
A reference about downloading and installing on Linux and Mac is here. You can download Nifi 0.6.1 from any of the sites listed there, for example: https://www.apache.org/dyn/closer.lua?path=/nifi/0.6.1/nifi-0.6.1-bin.tar.gz
I prefer wget and installing my apps to /opt
That will download a 421.19MB file
tar –xvf nifi-0.6.1-bin.tar.gz ls -l
and here is your /opt/nifi-0.6.1
That is your NIFI_HOME.
Open a browser and type:
You will need to import the NiFi .xml template posted in my github repo, mentioned earlier. Clone it to your local folder of preference, assuming that you have a git client installed:
After importing the model, instantiate it. It will show as the following:
In order for the template to work for your specific folder structure, you will need to make a few changes to tell GetFile processor (right-click on Get File processor header, View Configuration, Properties tab, Input Directory, from where to go to get the data). Keep in mind that the GetFile processor once started it will read the file and delete it. If you want to re-feed it for test, you just have to drop it again in the same folder and it will re-ingest it. You can also place multiple files of the same structure in that folder and they will be ingested all and every line. In real-life, GetFile can be replaced with a different processor capable to read from an actual log. For this demo, I used a static file as an input.
Also, enable and start DistributedMapCacheServer Controller Service.This is required for the put and fetch distributed cache.
The DistributedMapCacheServer can be started just like any other Controller Service, by configuring it to be valid and hit the "start" button. The unique thing about the DistributedMapCacheServer is that processors work with the cache by utilizing a DistributedMapCacheClientService. So you will create both a Server and Client Service. Then configure the processor to use the Client Service. Next start both the server and service. Finally start the processor.
For your test, you can use the two files checked-in to the git repo that you just cloned locally: macaddresses-blacklist.txt and macaddresses-log.txt.
macaddresses-blacklist.txt is a list of blacklisted mac addresses which will be used to filter the incoming stream fed by macaddresses-log.txt using GetFile ingest, line by line.
To understand what happens step-by-step, I suggest to start each processor and inspect the queue and data lineage.
Populating DistributedMapCache is performed in the flow presented on the right side of the model that you imported at the previous step.
The filtering flow, via Scan Attribute or FetchDistributedCacheMap:
Use of GetFile, SplitText and ExtractText processor is well documented and a basic Google will return several good examples, however, a good example of how to use FetchDistributedMapCache and PutDistributedMapCache is not that well documented. That was the main reason to write this article. I could not find another good reference. I am sure others felt the same way and hopefully this helps.
Before starting this processor, you need to right click on its header, choose Configuration and go to Properties tab and change the Dictionary file to be your macaddress-blacklist.txt. This is a clone of the same file you use in GetFile processor, but I suggest to put it in a separate folder as such GetFile will not ingest it and delete it after use. This needs to be permanent like a lookup file on the disk.
SplitText is used to split the file line by line. You can check this property by righ-clicking the header of the processor and choosing “Properties” tab. Line Split Count is set to 1.
ExtractText processor uses a custom regex to extract the mac address from macaddress-log.txt. You can find in Properties, last property in the list.
ScanAttribute processor sets the Dictionary File to the folder/file of choice. In this demo, I used macaddresses-blacklist.txt file included in the repo that you cloned at one of the previous steps.
The right branch of the flow in the left uses the DistributedCache populated by the flow on the right of the model. Inspect each processor by checking each processor properties. They are already similar with the first half of the flow on the left, excepting the use of PutDistributedCache processor which sets the Cache Entry Identifier for mac.address value.
I’ll refer only to the consumption of the mac.address property value set by PutDistributedCache.
Set Cache Entry Identifier to the same mac.address
Please note that DistributedMapCacheClientService is enabled. You can achieve that by clicking on NiFi Flow Settings icon, fourth on the right corner, "Controller Services"
Don’t forget to start all processors and inspect the queues. My approach is to start processors one at the time in the order of the data flow and processing and check all the stats and lineage on connection queues. This is what I love about NiFi, it is so easy to test and learn.
Thanks to Simon Ball for taking a few minutes of his time to review the model on DistributedCache approach.
Dynamic filtering has large applicability in any type of simple event processing. I am sure that there are many other ways to skin the same cat with NiFi. Enjoy!