Created on 10-04-2017 03:11 PM - edited 08-17-2019 10:42 AM
This tutorial is the second article of a two part series. We will walk-through in detail a flow which:
The first article can be found here.
This tutorial was tested using the following environment and components:
Here is a quick overview of the flow:
1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask
2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile
3. LookupRecord adds geo enrichment data to the flowfile
4. PartitionRecord groups the records by State
5. RouteOnAttribute filters records to those that originate from California
6. PublishKafka_0_10 publishes the California records to Kafka
UpdateAttribute Processor
Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:
Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:
Geo Enrich (LookupRecord Processor)
The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:
Here is a breakdown of what the processor is configured to do:
Let's look at the mentioned controller services in detail.
JsonTreeReader Controller Service
Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service.
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{ "namespace": "nifi", "name": "provenanceEvent", "type": "record", "fields": [ { "name": "eventId", "type": "string" }, { "name": "eventOrdinal", "type": "long" }, { "name": "eventType", "type": "string" }, { "name": "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } }, { "name": "durationMillis", "type": "long" }, { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } }, { "name": "details", "type": "string" }, { "name": "componentId", "type": "string" }, { "name": "componentType", "type": "string" }, { "name": "entityId", "type": "string" }, { "name": "entityType", "type": "string" }, { "name": "entitySize", "type": ["null", "long"] }, { "name": "previousEntitySize", "type": ["null", "long"] }, { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } }, { "name": "previousAttributes", "type": { "type": "map", "values": "string" } }, { "name": "actorHostname", "type": "string" }, { "name": "contentURI", "type": "string" }, { "name": "previousContentURI", "type": "string" }, { "name": "parentIds", "type": { "type": "array", "items": "string" } }, { "name": "childIds", "type": { "type": "array", "items": "string" } }, { "name": "platform", "type": "string" }, { "name": "application", "type": "string" }, { "name": "transitUri", "type": ["null", "string"] }, { "name": "remoteHost", "type": ["null", "string"] }, { "name": "enrichment", "type": ["null", { "name": "enrichmentRecord", "namespace": "nifi", "type": "record", "fields": [ { "name": "geo", "type": ["null", { "name": "cityGeo", "type": "record", "fields": [ { "name": "city", "type": ["null", "string"] }, { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" }, { "name": "metroCode", "type": ["null", "int"] }, { "name": "timeZone", "type": ["null", "string"] }, { "name": "latitude", "type": ["null", "double"] }, { "name": "longitude", "type": ["null", "double"] }, { "name": "country", "type": ["null", { "type": "record", "name": "country", "fields": [ { "name": "name", "type": "string" }, { "name": "isoCode", "type": "string" } ] }] }, { "name": "subdivisions", "type": { "type": "array", "items": { "type": "record", "name": "subdivision", "fields": [ { "name": "name", "type": "string" }, { "name": "isoCode", "type": "string" } ] } } }, { "name": "continent", "type": ["null", "string"] }, { "name": "postalCode", "type": ["null", "string"] } ] }] }, { "name": "isp", "type": ["null", { "name": "ispEnrich", "type": "record", "fields": [ { "name": "name", "type": ["null", "string"] }, { "name": "organization", "type": ["null", "string"] }, { "name": "asn", "type": ["null", "int"] }, { "name": "asnOrganization", "type": ["null", "string"] } ] }] }, { "name": "domainName", "type": ["null", "string"] }, { "name": "connectionType", "type": ["null", "string"], "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'" }, { "name": "anonymousIp", "type": ["null", { "name": "anonymousIpType", "type": "record", "fields": [ { "name": "anonymous", "type": "boolean" }, { "name": "anonymousVpn", "type": "boolean" }, { "name": "hostingProvider", "type": "boolean" }, { "name": "publicProxy", "type": "boolean" }, { "name": "torExitNode", "type": "boolean" } ] }] } ] } ]} ] }
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
IPLookupService Controller Service
Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:
This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the first tutorial article to look for enrichment data matching the given ip/hostname.
Start the LookupRecord processor:
Since we are only interested in matching records, we connect the "matched" relationship to the next processor.
Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:
For this flowfile, the enrichment data shows California (www.google.com). Selecting another shows Massachusetts (www.toyota.jp):
Partition by State (PartitionRecord Processor)
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.
A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.
Start the PartitionRecord processor:
Check if California (RouteOnAttribute Processor)
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes flowfiles if state is CA.
Run the RouteOnAttributeProcessor to see this in action:
PublishKafka_0_10 Processor
The final processor in the flow is PublishKafka_0_10. Looking at the properties:
Run the processor to see the records published in the Kafka topic "California":
I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi: