Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial is the second article of a two part series. We will walk-through in detail a flow which:

  • Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
  • Uses the PartitionRecord processor to group like records by State
  • Publishes records originating from California to Kafka

The first article can be found here.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0
  • Apache Kafka 0.10.2.1

NiFi LookupRecord Flow Walk Through

Prerequisites

  • The first article in this tutorial series has been completed.
  • The file GeoLite2-City.mmdb has been downloaded locally into a directory named "enrichment-db" within your NiFi installation
  • Provenance data has been generated and received by the "Provenance In" input port.
  • All necessary controller services and reporting tasks are enabled/running.
  • Kafka is running and a topic "California" has been created.

Flow Overview

Here is a quick overview of the flow:

40668-lookuprecordflow.png

1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask

2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile

3. LookupRecord adds geo enrichment data to the flowfile

4. PartitionRecord groups the records by State

5. RouteOnAttribute filters records to those that originate from California

6. PublishKafka_0_10 publishes the California records to Kafka

Flow Details

UpdateAttribute Processor

Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:

40666-16-updateattribute-properties.png

Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:

40667-17-flowfile-schema-name.png

Geo Enrich (LookupRecord Processor)

The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:

40669-18-lookuprecord-properties.png

Here is a breakdown of what the processor is configured to do:

  1. Record Reader is set to "JsonTreeReader" and Record Writer is set to "JsonRecordSetWriter". The "JsonTreeReader" controller service parses the event data in JSON format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
  2. The custom property "ip" uses regex to extract the hostname from the "transitUri" field in the record.
  3. LookupService is set to the "IPLookupService" controller service. The ip/hostnames from the previous step are looked up in the MaxMind database to determine if enrichment data exists for that hostname.
  4. Result Recordpath is set to "/enrichment", which is the field added to the record if enrichment data is returned from the Lookup Service.
  5. Routing Strategy is set to "Route to 'matched' or 'unmatched'". Records are routed to these relationships depending on whether or not there was a match in the IPLookupService.

Let's look at the mentioned controller services in detail.

JsonTreeReader Controller Service

Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:

40670-19-jsontreereader-properties.png

With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service.

AvroSchemaRegistry Controller Service

The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:

40671-20-avroschemaregistry-properties.png

The schema is defined as:
 {
   "namespace": "nifi",
   "name": "provenanceEvent",
   "type": "record",
   "fields": [
     { "name": "eventId", "type": "string" },
     { "name": "eventOrdinal", "type": "long" },
     { "name": "eventType", "type": "string" },
     { "name": "timestampMillis", "type": {
       "type": "long",
       "logicalType": "timestamp-millis"
     } },
     { "name": "durationMillis", "type": "long" },
     { "name": "lineageStart", "type": {
       "type": "long",
       "logicalType": "timestamp-millis"
     } },
     { "name": "details", "type": "string" },
     { "name": "componentId", "type": "string" },
     { "name": "componentType", "type": "string" },
     { "name": "entityId", "type": "string" },
     { "name": "entityType", "type": "string" },
     { "name": "entitySize", "type": ["null", "long"] },
     { "name": "previousEntitySize", "type": ["null", "long"] },
     { "name": "updatedAttributes", "type": {
       "type": "map",
       "values": "string"
     } },
     { "name": "previousAttributes", "type": {
       "type": "map",
       "values": "string"
     } },
     { "name": "actorHostname", "type": "string" },
     { "name": "contentURI", "type": "string" },
     { "name": "previousContentURI", "type": "string" },
     { "name": "parentIds", "type": {
       "type": "array",
       "items": "string"
     } },
     { "name": "childIds", "type": {
       "type": "array",
       "items": "string"
     } },
     { "name": "platform", "type": "string" },
     { "name": "application", "type": "string" },
     { "name": "transitUri", "type": ["null", "string"] },
     { "name": "remoteHost", "type": ["null", "string"] },
     { "name": "enrichment", "type": ["null",
 {
   "name": "enrichmentRecord",
   "namespace": "nifi",
   "type": "record",
   "fields": [
     {
       "name": "geo",
       "type": ["null", {
         "name": "cityGeo",
         "type": "record",
         "fields": [
           { "name": "city", "type": ["null", "string"] },
           { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
           { "name": "metroCode", "type": ["null", "int"] },
           { "name": "timeZone", "type": ["null", "string"] },
           { "name": "latitude", "type": ["null", "double"] },
           { "name": "longitude", "type": ["null", "double"] },
           { "name": "country", "type": ["null", {
             "type": "record",
             "name": "country",
             "fields": [
               { "name": "name", "type": "string" },
               { "name": "isoCode", "type": "string" }
             ]
           }] },
             { "name": "subdivisions", "type": {
               "type": "array",
               "items": {
                 "type": "record",
                 "name": "subdivision",
                 "fields": [
                   { "name": "name", "type": "string" },
                   { "name": "isoCode", "type": "string" }
                 ]
               }
             }
           },
           { "name": "continent", "type": ["null", "string"] },
           { "name": "postalCode", "type": ["null", "string"] }
         ]
       }]
     },
     {
       "name": "isp",
       "type": ["null", {
         "name": "ispEnrich",
         "type": "record",
         "fields": [
           { "name": "name", "type": ["null", "string"] },
           { "name": "organization", "type": ["null", "string"] },
           { "name": "asn", "type": ["null", "int"] },
           { "name": "asnOrganization", "type": ["null", "string"] }
         ]
       }]
     },
     {
       "name": "domainName",
       "type": ["null", "string"]
     },
     {
       "name": "connectionType",
       "type": ["null", "string"],
       "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
     },
     {
       "name": "anonymousIp",
       "type": ["null", {
         "name": "anonymousIpType",
         "type": "record",
         "fields": [
           { "name": "anonymous", "type": "boolean" },
           { "name": "anonymousVpn", "type": "boolean" },
           { "name": "hostingProvider", "type": "boolean" },
           { "name": "publicProxy", "type": "boolean" },
           { "name": "torExitNode", "type": "boolean" }
         ]
       }]
     }
   ]
 }
     ]}
   ]
 }

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:

40672-21-jsonrecordsetwriter-properties.png

Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

IPLookupService Controller Service

Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:

40684-iplookupservice-properties.png

This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the first tutorial article to look for enrichment data matching the given ip/hostname.

Start the LookupRecord processor:

40673-22-lookuprecord-start.png

Since we are only interested in matching records, we connect the "matched" relationship to the next processor.

Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:

40675-23a-flowfile-enrichment-ca.png

For this flowfile, the enrichment data shows California (www.google.com). Selecting another shows Massachusetts (www.toyota.jp):

40676-23b-flowfile-enrichment-ma.png

Partition by State (PartitionRecord Processor)

The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:

40677-24-partitionrecord-properties.png

The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.

A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.

Start the PartitionRecord processor:

40678-25-partitionrecord-start.png

Check if California (RouteOnAttribute Processor)

A RouteOnAttribute processor is next in the flow. Looking at the properties:

40679-26-routeonattribute-properties.png

this processor routes flowfiles if state is CA.

Run the RouteOnAttributeProcessor to see this in action:

40680-27-routeonattribute-start.png

PublishKafka_0_10 Processor

The final processor in the flow is PublishKafka_0_10. Looking at the properties:

40681-28-publishkafka-properties.png

Run the processor to see the records published in the Kafka topic "California":

40683-29-kafkaconsumer.png

I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:

3,461 Views