Member since
04-05-2016
139
Posts
143
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
33230 | 02-14-2019 02:53 PM | |
2534 | 01-04-2019 08:39 PM | |
10805 | 11-05-2018 03:38 PM | |
5079 | 09-27-2018 04:21 PM | |
2758 | 07-05-2018 02:56 PM |
11-07-2017
07:26 PM
4 Kudos
Objective
This tutorial consists of two articles. The first walks you through a NiFI flow that utilizes the
ValidateRecord processor and Record Reader/Writer controller services to:
Convert a CVS file into JSON format
Validate the data against a given schema
Write the JSON data to either a 'valid' relationship or 'invalid' relationship
The second article, modifies the flow to demonstrated the effects of enabling/disabling the "Strict Type Checking" property of the ValidateRecord processor.
Note: The ValidateRecord processor was introduced in NiFi 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0 ValidateRecord Flow Support Files
Here is a template of the flow discussed in this tutorial: validaterecord.xml
In the spirit of Halloween last week, let's ingest some data that ranks the most popular candies: candy-data-dirty.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, Kaggle.
Values in the data were then added/modified/deleted for the purposes of this tutorial. Demo Configuration Input Directory
Create a local input directory. Place the "candy-data-dirty.csv" file in the input directory. Import Template
Start NiFi. Import the provided template and add it to the canvas: Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point: Update Directory Path in GetFile Processor
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of candy data from a local directory
2. UpdateAttribute adds Schema Name "candy" as an attribute to the flowfile
3. ValidateRecord converts the flowfile contents from CSV to JSON by:
Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
The AvroSchemaRegistry contains a "candy" schema which defines information about each record (field names, field ids, field types)
Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema
4. ValidateRecord then writes the data to:
The "valid" relationship for records that adhere to the schema
The "invalid" relationship for records that do not adhere to the schema
5. Both valid and invalid data are sent to LogAttribute processors in order to investigate the contents and provenance of the queued flowfiles. Flow Details
Let's look at each of the processors in the flow in detail:
Get Data (GetFile Processor)
FlowFiles are generated from the candy-data-dirty.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "candy" to the flowfile:
Start the processor and view the attributes of the flowfile to confirm this:
ValidateRecord Processor
The next processor, ValidateRecord, creates record objects from the flowfile. Then it reads the records one at a time and validates each against the given schema. The records are then written to either a 'valid' relationship or a 'invalid' relationship. Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. Schema Registry is set to "AvroSchemaRegistry". More details about these controller services can be found below.
Allow Extra Fields property is set to "false" to demonstrate how records that have additional fields can be sent to the "invalid" relationship. Strict Type Checking is set to "true", to demonstrate how data with incorrect field types can be routed to the "invalid" relationship (described in more detail in the second article).
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Configuration button (gear icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the "candy" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Configuration button (gear icon) to see its properties:
The candy schema is as follows:
{
"type": "record",
"name": "CandyRecord",
"fields" : [
{"name": "competitorname", "type": "string"},
{"name": "chocolate", "type": "int"},
{"name": "fruity", "type": "int"},
{"name": "caramel", "type": "int"},
{"name": "peanutyalmondy", "type": "int"},
{"name": "nougat", "type": "int"},
{"name": "crispedricewafer", "type": "int"},
{"name": "hard", "type": "int"},
{"name": "bar", "type": "int"},
{"name": "pluribus", "type": "int"},
{"name": "sugarpercent", "type": "double"},
{"name": "pricepercent", "type": "double"},
{"name": "winpercent", "type": "double"}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Configuration button (gear icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the ValidateRecord processor. The flowfile is now split into two flowfiles:
Looking at the contents of the "invalid" connection shows that there were 3 records (now in JSON) that did not match the schema:
[ {
"competitorname" : "Boston Baked Beans",
"chocolate" : 0,
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 1,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 0,
"bar" : 0,
"pluribus" : 1,
"sugarpercent" : 0.31299999,
"pricepercent" : 0.51099998,
"winpercent" : 23.417824,
"unknown_field_index_13" : "3.14159"
}, {
"competitorname" : "Nestle Butterfinger",
"chocolate" : 1,
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 1,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 0,
"bar" : 1,
"pluribus" : 0,
"sugarpercent" : 0.60399997,
"pricepercent" : 0.76700002,
"winpercent" : "hello"
}, {
"competitorname" : "Strawberry bon bons",
"chocolate" : null,
"fruity" : 1,
"caramel" : 0,
"peanutyalmondy" : 0,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 1,
"bar" : 0,
"pluribus" : 1,
"sugarpercent" : 0.56900001,
"pricepercent" : 0.057999998,
"winpercent" : 34.578991
} ]
To see why these records were invalid, select the Provenance icon:
Then select the View Details button ("i" icon) for the ROUTE event:
On the Provenance Event window that opens, scroll down to the bottom so see the Details:
You can look at the contents of the "valid" connection to see the records that did match the schema. A total of 82 records were valid, which is displayed by the record.count attribute:
In preparation for the next article of the tutorial, start the two LogAttribute processors to clear the connection queues. Then stop all processors.
... View more
Labels:
10-31-2017
02:17 PM
1 Kudo
A Kudu Processor was added to NiFi by @Cam Mach in NiFi 1.4.0. I wrote an HCC article about it: https://community.hortonworks.com/content/kbentry/144009/using-the-putkudu-processor-to-ingest-mysql-data-i.html
... View more
10-23-2017
07:01 PM
2 Kudos
Objective
This tutorial demonstrates how to use the
QueryDatabaseTable and PutKudu processors to read data from a MySQL database and put into Kudu. Thanks to @Cam Mach for his assistance with this article.
Note: The PutKudu processor was introduced in NiFi 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0
Apache Kudu 1.5.0
MySQL 5.7.13 PutKudu (AvroReader) Demo Configuration MySQL Setup
In your MySQL instance, choose a database ("nifi_db" in my instance) and create the table "users":
unix> mysql -u root -p
unix> Enter password:<enter>
mysql> use nifi_db;
mysql>CREATE TABLE `users` (
`id` mediumint(9) NOT NULL AUTO_INCREMENT,
`title` text,
`first_name` text,
`last_name` text,
`street` text,
`city` text,
`state` text,
`zip` text,
`gender` text,
`email` text,
`username` text,
`password` text,
`phone` text,
`cell` text,
`ssn` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=latin1;
Add data to the "users" table:
mysql>INSERT INTO `users` (`id`, `title`, `first_name`, `last_name`, `street`, `city`, `state`, `zip`, `gender`, `email`, `username`, `password`, `phone`, `cell`, `ssn`)
VALUES (1, 'miss', 'marlene', 'shaw', '3450 w belt line rd', 'abilene', 'florida', '31995', 'F', 'marlene.shaw75@example.com', 'goldenpanda70', 'naughty', '(176)-908-6931', '(711)-565-2194', '800-71-1872'),
(2, 'ms', 'letitia', 'jordan', '2974 mockingbird hill', 'irvine', 'new jersey', '64361', 'F', 'letitia.jordan64@example.com', 'lazytiger614', 'aaaaa1', '(860)-602-3314', '(724)-685-3472', '548-93-7031'),
(3, 'mr', 'todd', 'graham', '5760 spring hill rd', 'garden grove', 'north carolina', '81790', 'M', 'todd.graham39@example.com', 'purplekoala484', 'paintball', '(230)-874-6532', '(186)-529-4912', '362-31-5248'),
(4, 'mr', 'seth', 'martinez', '4377 fincher rd', 'chandler', 'south carolina', '73651', 'M', 'seth.martinez82@example.com', 'bigbutterfly149', 'navy', '(122)-782-5822', '(720)-778-8541', '200-80-9087'),
(5, 'mr', 'guy', 'mckinney', '4524 hogan st', 'iowa park', 'ohio', '24140', 'M', 'guy.mckinney53@example.com', 'blueduck623', 'office', '(309)-556-7859', '(856)-764-9146', '973-37-9077'),
(6, 'ms', 'anna', 'smith', '5047 cackson st', 'rancho cucamonga', 'pennsylvania', '56486', 'F', 'anna.smith74@example.com', 'goldenfish121', 'albion', '(335)-388-7351', '(485)-150-6348', '680-20-6440'),
(7, 'mr', 'johnny', 'johnson', '7250 bruce st', 'gresham', 'new mexico', '83973', 'M', 'johnny.johnson73@example.com', 'crazyduck127', 'toast', '(142)-971-3099', '(991)-131-1582', '683-26-4133'),
(8, 'mrs', 'robin', 'white', '7882 northaven rd', 'orlando', 'connecticut', '40452', 'F', 'robin.white46@example.com', 'whitetiger371', 'elizabeth', '(311)-659-3812', '(689)-468-6420', '960-70-3399'),
(9, 'miss', 'allison', 'williams', '7648 edwards rd', 'edison', 'louisiana', '52040', 'F', 'allison.williams82@example.com', 'beautifulfish354', 'sanfran', '(328)-592-3520', '(550)-172-4018', '164-78-8160');
Kudu Setup
For my setup, I followed the
Apache Kudu Quickstart instructions to easily set up and run a Kudu VM.
To check that your VM is running:
unix> VBoxManage list runningvms
"kudu-demo" {b39279b5-3dd6-478a-ac9d-2204bf88e7b9}
To see what IP Kudu is running on:
unix> VBoxManage guestproperty get kudu-demo /VirtualBox/GuestInfo/Net/0/V4/IP
Value: 192.168.58.100
The Kudu web client runs on port 8051:
Create a table in Kudu by first connecting to Impala in the virtual machine:
unix> ssh demo@quickstart.cloudera -t impala-shell
demo@quickstart.cloudera's password:
[quickstart.cloudera:21000] >
(
Note: The username and password for the Quickstart VM is "demo".)
Create the Kudu table with the same columns and data types as the MySQL table:
[quickstart.cloudera:21000] > CREATE TABLE users_kudu
(
id BIGINT,
title STRING,
first_name STRING,
last_name STRING,
street STRING,
city STRING,
state STRING,
zip STRING,
gender STRING,
email STRING,
username STRING,
password STRING,
cell STRING,
ssn STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
NiFi Flow Setup
Follow the following detailed instructions to set up the flow. Alternatively, a template of the flow can be downloaded here: putkudu-querydatabasetable.xml
1. Start NiFi. Two controller services are needed for the flow. Click the "Configuration" button (gear icon) from the Operate palette:
This opens the NiFi Flow Configuration window. Select the "Controller Services" tab. Click the "+" button and add a DBCPConnectionPool controller service:
Configure the controller service as follows (adjusting the property values to match your own MySQL instance and environment):
Next, add an AvroReader controller service:
Apply the default configuration:
Select the "lightning bolt" icon for each controller service to enable them:
2. Return to the NiFi canvas. Add a QueryDatabaseTable processor:
Configure the processor as follows:
where:
The DBCPConnectionPool controller service created earlier is selected for Database Connection Pooling Service
"users" is entered for the Table Name
"id" is entered for the Maximum-value Columns
3. Add a PutKudu processor and connect the two processors:
Configure the PuKudu processor as follows:
where:
"192.168.58.100:7051" is entered for the Kudu Masters IP and port (7051 is the default port)
"impala::default.users_kudu" is entered for the Table Name
Skip head line property is set to "false"
The AvroReader controller service created earlier is selected for Record Reader
Auto-terminate the Success relationship:
On the canvas, make a "failure" relationship connection from the PutKudu processor to itself:
4. The flow is ready to run. Run Flow
Start the QueryDatabaseTable processor.
Looking at the contents of the FlowFile in the queue, the data from the MySQL table has been ingested and converted to Avro format:
Start the PutKudu processor to put the data into Kudu:
This can be confirmed via a Select query:
With the flow still running, add another row of data to the Mysql "users" table:
The flow processes this data and the new row appears in Kudu:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader)
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
Geo Enrich NiFi Provenance Event Data using LookupRecord
Using PutMongoRecord to put CSV into MongoDB
... View more
Labels:
10-20-2017
02:50 PM
1 Kudo
Services defined in the Controller Settings are available to Reporting Tasks and other services defined there. Services defined within a Process Group are available to descendent components. To create a service that is available to all components in the flow, define it in the root Process Group. Here is a helpful article on the scoping of Controller services: https://community.hortonworks.com/content/kbentry/90259/understanding-controller-service-availability-in-a.html Improvements were also made in the documentation in version 1.3.0: https://issues.apache.org/jira/browse/NIFI-3911 In version 1.4.0, the name of the tab was changed to be more explicit as well to "Reporting Task Controller Services".
... View more
10-16-2017
03:48 PM
@Casel Chen, Thanks for your response. Hoping your original issue is resolved if you setup the Distributed Map Cache Server as detailed in the first article of this tutorial. My flow template (attached with the tutorial) has the Distributed Map Cache Client controller service. PutDatabaseRecord does not support Hive or HBase. For Hive, use a PutHiveStreaming processor instead. PutHiveStreaming expects Avro format. You can use the ConvertRecord processor to convert to Avro.
... View more
10-13-2017
03:41 PM
Hi @Casel Chen, Just to confirm: Did you use a Distributed Map Cache Client/Server as discussed in the first article? There is a Jira that is possibly related to what you are seeing if not: https://issues.apache.org/jira/browse/NIFI-3902 Also, are you using a MySQL? Some issues have been reported if using MariaDB. Thanks!
... View more
10-13-2017
03:34 PM
3 Kudos
Objective
This tutorial demonstrates how to use the
PutMongoRecord processor to easily put data from a CSV file into MongoDB.
Note: The PutMongoRecord processor was introduced in NiFi 1.4.0. As such, the tutorial needs to be done running Version 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0
MongoDB 3.4.9 PutMongoRecord (CSVReader) Demo Configuration MongoDB
For my environment, I had a local MongoDB 3.4.9 instance installed.
Start MongoDB and create a database "hcc" and a collection "movies" for use with this tutorial:
>use hcc
switched to db hcc
> db.createCollection("movies")
{ "ok" : 1 }
> show collections
movies
I like to use
Robot 3T to manage/monitor my MongoDB instance: Initial Flow
A powerful feature about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.
For example, let's assume you have the flow working from the article
"Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)" with all of the necesary controller services enabled.
(Note: The template for this flow can be found in the article as well as step-by-step instructions on how to configure it.)
As currently configured, the flow:
1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.
2. Unzips the file.
3. Sends only the movie title information on in the flow.
4. Adds Schema Name "movies" as an attribute to the flowfile.
5. Uses
PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.
Instead of publishing that movie data to Kafka, we now want to put it in
MongoDB. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutMongoRecord processor and re-using the CSVReader that references an Avro Schema Registry where the movies schema is defined. PutMongoRecord Flow Setup
1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.
2. Add a
PutMongoRecord to the canvas.
3. Connect the UpdateAttribute processor to the PutMongoRecord processor:
4. Open the Configure dialog for the PutMongoRecord process. On the Settings tab, auto-terminate the "success" relationship.
5. On the canvas, make a "failure" relationship connection from the PutMongoRecord to itself.
6. On the Properties tab:
Add "mongodb://localhost:27017" for the Mongo URI property
Add "hcc" for the Mongo Database Name property
Add "movies" for the Mongo Collection Name property
Since it and related schema were already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.
Select "Apply".
The flow is ready to run. Flow Results
Start the flow.
(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)
The movie data is now in MongoDB: Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader) in Apache NiFi 1.2+
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files (Apache NiFi 1.2+)
Geo Enrich NiFi Provenance Event Data using LookupRecord
... View more
Labels:
10-04-2017
03:11 PM
3 Kudos
Objective
This tutorial is the second article of a two part series. We will walk-through in detail a flow which:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
The first article can be found
here.
Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1
NiFi LookupRecord Flow Walk Through
Prerequisites
The first article in this tutorial series has been completed.
The file GeoLite2-City.mmdb has been downloaded locally into a directory named "enrichment-db" within your NiFi installation
Provenance data has been generated and received by the "Provenance In" input port.
All necessary controller services and reporting tasks are enabled/running.
Kafka is running and a topic "California" has been created.
Flow Overview
Here is a quick overview of the flow:
1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask
2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile
3. LookupRecord adds geo enrichment data to the flowfile
4. PartitionRecord groups the records by State
5. RouteOnAttribute filters records to those that originate from California
6. PublishKafka_0_10 publishes the California records to Kafka
Flow Details
UpdateAttribute Processor
Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:
Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:
Geo Enrich (LookupRecord Processor)
The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:
Here is a breakdown of what the processor is configured to do:
Record Reader is set to "JsonTreeReader" and Record Writer is set to "JsonRecordSetWriter". The "JsonTreeReader" controller service parses the event data in JSON format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
The custom property "ip" uses regex to extract the hostname from the "transitUri" field in the record.
LookupService is set to the "IPLookupService" controller service. The ip/hostnames from the previous step are looked up in the MaxMind database to determine if enrichment data exists for that hostname.
Result Recordpath is set to "/enrichment", which is the field added to the record if enrichment data is returned from the Lookup Service.
Routing Strategy is set to "Route to 'matched' or 'unmatched'". Records are routed to these relationships depending on whether or not there was a match in the IPLookupService.
Let's look at the mentioned controller services in detail.
JsonTreeReader Controller Service
Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service.
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"namespace": "nifi",
"name": "provenanceEvent",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" },
{ "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "previousAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "actorHostname", "type": "string" },
{ "name": "contentURI", "type": "string" },
{ "name": "previousContentURI", "type": "string" },
{ "name": "parentIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "childIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] },
{ "name": "remoteHost", "type": ["null", "string"] },
{ "name": "enrichment", "type": ["null",
{
"name": "enrichmentRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "geo",
"type": ["null", {
"name": "cityGeo",
"type": "record",
"fields": [
{ "name": "city", "type": ["null", "string"] },
{ "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
{ "name": "metroCode", "type": ["null", "int"] },
{ "name": "timeZone", "type": ["null", "string"] },
{ "name": "latitude", "type": ["null", "double"] },
{ "name": "longitude", "type": ["null", "double"] },
{ "name": "country", "type": ["null", {
"type": "record",
"name": "country",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}] },
{ "name": "subdivisions", "type": {
"type": "array",
"items": {
"type": "record",
"name": "subdivision",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}
}
},
{ "name": "continent", "type": ["null", "string"] },
{ "name": "postalCode", "type": ["null", "string"] }
]
}]
},
{
"name": "isp",
"type": ["null", {
"name": "ispEnrich",
"type": "record",
"fields": [
{ "name": "name", "type": ["null", "string"] },
{ "name": "organization", "type": ["null", "string"] },
{ "name": "asn", "type": ["null", "int"] },
{ "name": "asnOrganization", "type": ["null", "string"] }
]
}]
},
{
"name": "domainName",
"type": ["null", "string"]
},
{
"name": "connectionType",
"type": ["null", "string"],
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
},
{
"name": "anonymousIp",
"type": ["null", {
"name": "anonymousIpType",
"type": "record",
"fields": [
{ "name": "anonymous", "type": "boolean" },
{ "name": "anonymousVpn", "type": "boolean" },
{ "name": "hostingProvider", "type": "boolean" },
{ "name": "publicProxy", "type": "boolean" },
{ "name": "torExitNode", "type": "boolean" }
]
}]
}
]
}
]}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
IPLookupService Controller Service
Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:
This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the
first tutorial article to look for enrichment data matching the given ip/hostname.
Start the LookupRecord processor:
Since we are only interested in matching records, we connect the "matched" relationship to the next processor.
Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:
For this flowfile, the enrichment data shows California (www.google.com). Selecting another shows Massachusetts (www.toyota.jp):
Partition by State (PartitionRecord Processor)
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.
A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.
Start the PartitionRecord processor:
Check if California (RouteOnAttribute Processor)
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes flowfiles if state is CA.
Run the RouteOnAttributeProcessor to see this in action:
PublishKafka_0_10 Processor
The final processor in the flow is PublishKafka_0_10. Looking at the properties:
Run the processor to see the records published in the Kafka topic "California":
I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader) in Apache NiFi 1.2+
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files (Apache NiFi 1.2+)
... View more
Labels:
10-04-2017
01:52 PM
4 Kudos
Objective
This tutorial walks you through a NiFi flow that:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
This article is the first of a two part series. We will setup the demo environment including flows, controller services and reporting tasks. The
second article will walk through the main flow step by step. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1 Environment Configuration Kafka Setup
In the
bin directory of your Kafka install:
Start ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
Start Kafka:
./kafka-server-start.sh ../config/server.properties
Create Kafka Topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic California
Start Kafka Consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic California --from-beginning NiFi Instances Configuration
For this tutorial, you need two NiFi instances running. One instance generates and sends provenance data to the other via the SiteToSiteProvenanceReportingTask.
Instructions on how to setup both instances can be found in the HCC article
"Extracting NiFi Provenance Data using SiteToSiteProvenanceReportingTask". Main Dataflow Instance Setup
In the instance that will use the provenance data (http://localhost:8088/nifi), import the following template:
lookuprecord-geoenrich.xml
You should see the following flow on your NiFi canvas:
First, let's get the
MaxMind Database file that is used to enrich the data. This is done by the flow contained within the "Gather Enrichment Data" process group.
Run the flow and the file GeoLite2-City.mmdb should be downloaded locally into a directory named "enrichment-db" within your NiFi installation.
Now, let's enable the flow's controller services. Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the JsonTreeReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services as well as the IPLookupService controller service. All the controller services should be enabled at this point:
We will step through the main flow in detail in the second article. For now, start only the "Provenance In" input port.
Provenance Event Generating Instance
In the instance that generates provenance data (http://localhost:8080/nifi), import the following template:
fetchsites.xml
The following flow should be on the NiFi canvas:
The two GetHTTP processors are configured as follows:
The UpdateAttribute processor is configured with all default settings:
Now, let's create the SiteToSiteProvenance reporting task.
Select the Global menu and choose "Controller Settings":
Select the Reporting Tasks tab and click the "+" icon:
Select SiteToSiteProvenanceReportingTask and click the "Add" button:
Configure the reporting task as follows:
On the Settings tab, set "Run Schedule" to 5 seconds:
(Note: Some of these settings are for demo purposes only and may need to be adjusted if run in a production environment.)
Start the reporting task:
Return to the NiFi canvas and start the flow to generate provenance data:
Run the flow for 30 secs or so to generate sufficient provenance events. Stop the flow.
Switch to your other NiFi instance. You should see flowfiles queued after the "Provenance In" input port:
We are now ready to geo enrich the provenance data.
Continue to the second article for a detailed walk through of the LookupRecord flow.
... View more
Labels:
09-25-2017
06:25 PM
Objective
This tutorial demonstrates how to upgrade HDF from version 3.0.0 to 3.0.1.1 via the Ambari-managed Express Upgrade.
Note: Additional documentation for HDF 3.0.1.1 upgrade can be found here. Environment
This tutorial was tested using the following environment and components:
Ambari 2.5.1 HDF 3.0.0 Initial Configuration Prerequisites
This tutorial assumes your HDF cluster is in a healthy state and that all
preqrequisites for Ambari upgrade have been met. Upgrade Steps Register Target Version
1. Select "Manage Ambari" from the Admin tab: 2. Select "Versions" from the Clusters section:
3. Select the "Register Version" button:
4. Select "HDF-3.0" from the listed versions:
5. Enter the proper extension in the Name field, "HDF-3.0.1.1". Remove any OS's that are not relevant for your cluster. For your OS, add the proper HDF-3.0.1.1 base URL. For my CentOS7 enviro, it was: http://public-repo-1.hortonworks.com/HDF/centos7/3.x/updates/3.0.1.1/tars/hdf_ambari_mp/hdf-ambari-mpack-3.0.1.1-5.tar.gz which was found in the
3.0.1.1 Release Notes.
6. Select "Save". HDF-3.0.1.1 is now registered but not yet installed.
Install HDF 3.0.1.1
7. Select "Install on ".
8. The versions being managed are displayed.
9. Select the "Install" button for HDF-3.0.1.1.
10. Select "OK" to confirm installation.
11. Allow for some install processing:
12. Then select "Upgrade":
Upgrade to HDF 3.0.1.1
13. You will be presented with two options: Rolling Upgrade and Express Upgrade. Rolling Upgrade is not selectable.
14. Express Upgrade may have some Checks to resolve. In my environment, there were 2 Required and 1 Warning:
15. Follow the instructions to resolve the required checks. The "Proceed" button can now be selected:
16. When prompted to confirm, select "Yes" to proceed with the upgrade:
17. The upgrade process begins:
18. The upgrade pauses to confirm whether you are ready to finalize the upgrade. When ready, select "Finalize".
19. The upgrade completes.
... View more
Labels: