Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)


This tutorial is the second article of a two part series. We will walk-through in detail a flow which:

  • Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
  • Uses the PartitionRecord processor to group like records by State
  • Publishes records originating from California to Kafka

The first article can be found here.


This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0
  • Apache Kafka

NiFi LookupRecord Flow Walk Through


  • The first article in this tutorial series has been completed.
  • The file GeoLite2-City.mmdb has been downloaded locally into a directory named "enrichment-db" within your NiFi installation
  • Provenance data has been generated and received by the "Provenance In" input port.
  • All necessary controller services and reporting tasks are enabled/running.
  • Kafka is running and a topic "California" has been created.

Flow Overview

Here is a quick overview of the flow:


1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask

2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile

3. LookupRecord adds geo enrichment data to the flowfile

4. PartitionRecord groups the records by State

5. RouteOnAttribute filters records to those that originate from California

6. PublishKafka_0_10 publishes the California records to Kafka

Flow Details

UpdateAttribute Processor

Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:


Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:


Geo Enrich (LookupRecord Processor)

The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:


Here is a breakdown of what the processor is configured to do:

  1. Record Reader is set to "JsonTreeReader" and Record Writer is set to "JsonRecordSetWriter". The "JsonTreeReader" controller service parses the event data in JSON format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
  2. The custom property "ip" uses regex to extract the hostname from the "transitUri" field in the record.
  3. LookupService is set to the "IPLookupService" controller service. The ip/hostnames from the previous step are looked up in the MaxMind database to determine if enrichment data exists for that hostname.
  4. Result Recordpath is set to "/enrichment", which is the field added to the record if enrichment data is returned from the Lookup Service.
  5. Routing Strategy is set to "Route to 'matched' or 'unmatched'". Records are routed to these relationships depending on whether or not there was a match in the IPLookupService.

Let's look at the mentioned controller services in detail.

JsonTreeReader Controller Service

Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:


With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is The Schema Registry property is set to the AvroSchemaRegistry Controller Service.

AvroSchemaRegistry Controller Service

The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:


The schema is defined as:
   "namespace": "nifi",
   "name": "provenanceEvent",
   "type": "record",
   "fields": [
     { "name": "eventId", "type": "string" },
     { "name": "eventOrdinal", "type": "long" },
     { "name": "eventType", "type": "string" },
     { "name": "timestampMillis", "type": {
       "type": "long",
       "logicalType": "timestamp-millis"
     } },
     { "name": "durationMillis", "type": "long" },
     { "name": "lineageStart", "type": {
       "type": "long",
       "logicalType": "timestamp-millis"
     } },
     { "name": "details", "type": "string" },
     { "name": "componentId", "type": "string" },
     { "name": "componentType", "type": "string" },
     { "name": "entityId", "type": "string" },
     { "name": "entityType", "type": "string" },
     { "name": "entitySize", "type": ["null", "long"] },
     { "name": "previousEntitySize", "type": ["null", "long"] },
     { "name": "updatedAttributes", "type": {
       "type": "map",
       "values": "string"
     } },
     { "name": "previousAttributes", "type": {
       "type": "map",
       "values": "string"
     } },
     { "name": "actorHostname", "type": "string" },
     { "name": "contentURI", "type": "string" },
     { "name": "previousContentURI", "type": "string" },
     { "name": "parentIds", "type": {
       "type": "array",
       "items": "string"
     } },
     { "name": "childIds", "type": {
       "type": "array",
       "items": "string"
     } },
     { "name": "platform", "type": "string" },
     { "name": "application", "type": "string" },
     { "name": "transitUri", "type": ["null", "string"] },
     { "name": "remoteHost", "type": ["null", "string"] },
     { "name": "enrichment", "type": ["null",
   "name": "enrichmentRecord",
   "namespace": "nifi",
   "type": "record",
   "fields": [
       "name": "geo",
       "type": ["null", {
         "name": "cityGeo",
         "type": "record",
         "fields": [
           { "name": "city", "type": ["null", "string"] },
           { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
           { "name": "metroCode", "type": ["null", "int"] },
           { "name": "timeZone", "type": ["null", "string"] },
           { "name": "latitude", "type": ["null", "double"] },
           { "name": "longitude", "type": ["null", "double"] },
           { "name": "country", "type": ["null", {
             "type": "record",
             "name": "country",
             "fields": [
               { "name": "name", "type": "string" },
               { "name": "isoCode", "type": "string" }
           }] },
             { "name": "subdivisions", "type": {
               "type": "array",
               "items": {
                 "type": "record",
                 "name": "subdivision",
                 "fields": [
                   { "name": "name", "type": "string" },
                   { "name": "isoCode", "type": "string" }
           { "name": "continent", "type": ["null", "string"] },
           { "name": "postalCode", "type": ["null", "string"] }
       "name": "isp",
       "type": ["null", {
         "name": "ispEnrich",
         "type": "record",
         "fields": [
           { "name": "name", "type": ["null", "string"] },
           { "name": "organization", "type": ["null", "string"] },
           { "name": "asn", "type": ["null", "int"] },
           { "name": "asnOrganization", "type": ["null", "string"] }
       "name": "domainName",
       "type": ["null", "string"]
       "name": "connectionType",
       "type": ["null", "string"],
       "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
       "name": "anonymousIp",
       "type": ["null", {
         "name": "anonymousIpType",
         "type": "record",
         "fields": [
           { "name": "anonymous", "type": "boolean" },
           { "name": "anonymousVpn", "type": "boolean" },
           { "name": "hostingProvider", "type": "boolean" },
           { "name": "publicProxy", "type": "boolean" },
           { "name": "torExitNode", "type": "boolean" }

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:


Schema Write Strategy is set to "Set '' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

IPLookupService Controller Service

Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:


This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the first tutorial article to look for enrichment data matching the given ip/hostname.

Start the LookupRecord processor:


Since we are only interested in matching records, we connect the "matched" relationship to the next processor.

Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:


For this flowfile, the enrichment data shows California ( Selecting another shows Massachusetts (


Partition by State (PartitionRecord Processor)

The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:


The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.

A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.

Start the PartitionRecord processor:


Check if California (RouteOnAttribute Processor)

A RouteOnAttribute processor is next in the flow. Looking at the properties:


this processor routes flowfiles if state is CA.

Run the RouteOnAttributeProcessor to see this in action:


PublishKafka_0_10 Processor

The final processor in the flow is PublishKafka_0_10. Looking at the properties:


Run the processor to see the records published in the Kafka topic "California":


I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.
Version history
Last update:
‎08-17-2019 10:42 AM
Updated by:
Top Kudoed Authors