Member since
04-05-2016
139
Posts
144
Kudos Received
16
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 50648 | 02-14-2019 02:53 PM |
01-19-2018
08:13 PM
4 Kudos
Objective
This tutorial walks you through how to install and setup a local Apache NiFi Registry to integrate with Apache NiFi and start using versioned NiFi dataflows. It assumes basic experience with NiFi but little to no experience with NiFi Registry. A video version of this tutorial can be seen here: https://youtu.be/X_qhRVChjZY Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.5.0
Apache NiFi Registry 0.1.0
Note: Apache NiFi 1.5.0 is the first NiFi release to support integration with the NiFi Registry. Nifi Registry 0.1.0 is the first and currently only version of the application. Apache NiFi Registry Configuration Registry Installation
Download the tarball of the 0.1.0 Registry release:
nifi-registry-0.1.0-bin.tar.gz
Extract the tar:
tar xzvf nifi-registry-0.1.0-bin.tar.gz
Start the Registry In a terminal window, navigate to the directory where NiFi Registry was installed. Run:
bin/nifi-registry.sh start
Open Registry UI
Navigate to the registry UI in your browser:
http://localhost:18080/nifi-registry
Note:By default the registry is unsecured. The port can be changed by editing the nifi-registry.properties file in the NiFi Registry conf directory (the exact property to change is nifi.registry.web.http.port), but the default port is 18080. Bucket Creation
A Bucket is a container that stores and organizes flows in the Registry. The Registry is empty as there are no buckets/flows yet.
To create a bucket, select the Settings icon ( ) in the top right corner of the screen. In the Buckets window that appears, select the "New Bucket" button.
Enter the bucket name "Test" and select the "Create" button.
The "Test" bucket is created:
There are no permissions configured by default, so anyone is able to view, create and modify buckets in this instance. For information on securing the Registry, see the NiFi Registry System Administrator’s Guide. Apache NiFi Configuration Connect NiFi to the Registry
With the Registry is running, we can tell NiFi about it.
In NiFi, select "Controller Settings" from the top-right Global menu:
Select the Registry Clients tab and the "+" button to add a new Registry Client. Enter a name and the URL of the Registry instance (http://localhost:18080): Versioned DataFlows Start Version Control on a Process Group
NiFi can now place a process group under version control which saves it as a flow resource in the Registry.
Right-click on a process group and select "Version→Start version control" from the context menu:
The local registry instance and "Test" bucket are chosen by default to store your flow since they are the only registry connected and bucket available. Enter a flow name, flow description, comments and select "Save":
As indicated by the Version State icon ( ) in the top left corner of the component, the process group is now saved as a versioned flow in the registry.
Go back to the Registry UI and return to the main page to see the versioned flow you just saved (a refresh may be required): Save Changes to a Versioned Flow
Changes made to the versioned process group can be reviewed, reverted or saved.
For example, if changes are made to the ABCD flow, the Version State changes to "Locally modified" ( ). The right-click menu will now show the options "Commit local changes", "Show local changes" or "Revert local changes":
Select "Show local changes" to see the details of the changes made:
Return to the context menu and select "Commit local changes". Enter comments and select "Save" to save the changes:
Version 2 of the flow is saved:
Note: Some actions made to the versioned process group are not considered local changes. More information can be found in the
Managing Local Changes section of the NiFi User Guide. Import a Versioned Flow
With a flow existing in the Registry, we can use it to illustrate how to import a versioned process group.
In NiFi, select Process Group from the Components toolbar and drag it onto the canvas:
Instead of entering a name, click the Import link:
Choose the version of the flow you want imported and select "Import":
A second identical PG is now added: Help
To learn more about NiFi Registry functionality and working with versioned flows in NiFi, see the following links:
Apache NiFi Registry User Guide
Apache NiFi Registry System Administrator's Guide
Versioning a DataFlow (Apache NiFi User Guide)
Apache NiFi - How do I deploy my flow?
... View more
Labels:
01-19-2018
08:05 PM
If you are using Apache NiFi 1.5.0, the script body used for the CSV to XML conversion needs to be updated/corrected with the following: import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerServiceimport org.apache.nifi.flowfile.FlowFileimport org.apache.nifi.logging.ComponentLogimport org.apache.nifi.schema.access.SchemaNotFoundExceptionimport org.apache.nifi.serialization.RecordSetWriterimport org.apache.nifi.serialization.RecordSetWriterFactoryimport org.apache.nifi.serialization.WriteResultimport org.apache.nifi.serialization.record.Recordimport org.apache.nifi.serialization.record.RecordSchemaimport org.apache.nifi.serialization.record.RecordSetimport org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter { private int recordCount = 0; private final OutputStream out; public GroovyRecordSetWriter(final OutputStream out) { this.out = out; } @Override WriteResult write(Record r) throws IOException { new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).record { r.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } recordCount++; WriteResult.of(1, [:]) }
@Override String getMimeType() { return 'application/xml' }
@Override WriteResult write(final RecordSet rs) throws IOException { int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).recordSet {
Record r while (r = rs.next()) { count++
record { rs.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } } } WriteResult.of(count, [:]) } public void beginRecordSet() throws IOException { } @Override public WriteResult finishRecordSet() throws IOException { return WriteResult.of(recordCount, [:]); } @Override public void close() throws IOException { } @Override public void flush() throws IOException { }}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null }
@Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out) } }
writer = new GroovyRecordSetWriterFactory()
... View more
01-19-2018
07:59 PM
Hi @Surendra Shringi, Probably best to post a new question on HCC for your issue. I'm not sure how to address it and the entire HCC community would be aware of your problem to help out.
... View more
11-07-2017
08:03 PM
4 Kudos
Objective
This is the second of a two article series on the ValidateRecord processor. The first walks you through a NiFI flow that converts a CVS file into JSON format and validates the data against a given schema.
This article discusses the effects of enabling/disabling the "Strict Type Checking" property of the ValidateRecord processor.
Note: The ValidateRecord processor was introduced in NiFi 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.4.0 Strict Type Checking Property
A useful property of the ValidateRecord processor is "Strict Type Checking". If the incoming data has a Record where a field is not of the correct type, this property determines how to handle the Record. If set to "true", the Record will be considered invalid. If set to "false", the Record will be considered valid.
To demonstrate both cases, we need to ingest data that can distinguish between different types (which our CSV data from the first article could not). Let's grab a snippet of the JSON candy data and make some changes. Specifically let's put a string value for the "chocolate" field (which is of type int) and let's put a decimal value for the "competitorname" field (which is of type string😞
[ {
"competitorname" : "One dime",
"chocolate" : "0",
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 0,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 0,
"bar" : 0,
"pluribus" : 0,
"sugarpercent" : 0.011,
"pricepercent" : 0.116,
"winpercent" : 32.261086
}, {
"competitorname" : 3.14159,
"chocolate" : 1,
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 0,
"nougat" : 0,
"crispedricewafer" : 1,
"hard" : 0,
"bar" : 0,
"pluribus" : 1,
"sugarpercent" : 0.87199998,
"pricepercent" : 0.84799999,
"winpercent" : 49.524113
} ]
Here is the JSON file: type-checking.txt (Change the extension from .txt to .json after downloading)
Place the type-checking.json file in your input directory: In order to process the JSON file, the ValidateRecord processor needs to use a JSON Record Reader. Go to the configuration window for the processor and select "Create new service..." for the Record Reader:
Select JSONTreeReader, then "Create": and then select the Arrow icon next to the reader:
Save the changes made before going to the Controller Service.
Go to the configuration window of the JsonTreeReader controller service, select "AvroSchemaRegistry" for the Schema Registy and then select Apply: Enable the JsonTreeReader service. The flow is ready to run.
Start the GetFile, UpdateAtttribute and ValidateRecord processors. With "Strict Type Checking" set to "true", the 2 records are considered invalid and are routed to that connection:
Start the LogAttribute processor to clear the queue. Stop all processors. Place the type-checking.json file in your input directory again.
Now let's change the Strict Type Checking property to "false":
Running the flow this time, the 2 records are considered valid and are routed to that connection:
Note: The documentation for the Strict Type Checking property states that when set to false, the relevant record fields will be coerced into the correct type. This functionality is currently broken (see NIFI-4579).
... View more
Labels:
11-07-2017
07:26 PM
4 Kudos
Objective
This tutorial consists of two articles. The first walks you through a NiFI flow that utilizes the
ValidateRecord processor and Record Reader/Writer controller services to:
Convert a CVS file into JSON format
Validate the data against a given schema
Write the JSON data to either a 'valid' relationship or 'invalid' relationship
The second article, modifies the flow to demonstrated the effects of enabling/disabling the "Strict Type Checking" property of the ValidateRecord processor.
Note: The ValidateRecord processor was introduced in NiFi 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0 ValidateRecord Flow Support Files
Here is a template of the flow discussed in this tutorial: validaterecord.xml
In the spirit of Halloween last week, let's ingest some data that ranks the most popular candies: candy-data-dirty.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, Kaggle.
Values in the data were then added/modified/deleted for the purposes of this tutorial. Demo Configuration Input Directory
Create a local input directory. Place the "candy-data-dirty.csv" file in the input directory. Import Template
Start NiFi. Import the provided template and add it to the canvas: Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point: Update Directory Path in GetFile Processor
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of candy data from a local directory
2. UpdateAttribute adds Schema Name "candy" as an attribute to the flowfile
3. ValidateRecord converts the flowfile contents from CSV to JSON by:
Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
The AvroSchemaRegistry contains a "candy" schema which defines information about each record (field names, field ids, field types)
Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema
4. ValidateRecord then writes the data to:
The "valid" relationship for records that adhere to the schema
The "invalid" relationship for records that do not adhere to the schema
5. Both valid and invalid data are sent to LogAttribute processors in order to investigate the contents and provenance of the queued flowfiles. Flow Details
Let's look at each of the processors in the flow in detail:
Get Data (GetFile Processor)
FlowFiles are generated from the candy-data-dirty.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "candy" to the flowfile:
Start the processor and view the attributes of the flowfile to confirm this:
ValidateRecord Processor
The next processor, ValidateRecord, creates record objects from the flowfile. Then it reads the records one at a time and validates each against the given schema. The records are then written to either a 'valid' relationship or a 'invalid' relationship. Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. Schema Registry is set to "AvroSchemaRegistry". More details about these controller services can be found below.
Allow Extra Fields property is set to "false" to demonstrate how records that have additional fields can be sent to the "invalid" relationship. Strict Type Checking is set to "true", to demonstrate how data with incorrect field types can be routed to the "invalid" relationship (described in more detail in the second article).
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Configuration button (gear icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the "candy" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Configuration button (gear icon) to see its properties:
The candy schema is as follows:
{
"type": "record",
"name": "CandyRecord",
"fields" : [
{"name": "competitorname", "type": "string"},
{"name": "chocolate", "type": "int"},
{"name": "fruity", "type": "int"},
{"name": "caramel", "type": "int"},
{"name": "peanutyalmondy", "type": "int"},
{"name": "nougat", "type": "int"},
{"name": "crispedricewafer", "type": "int"},
{"name": "hard", "type": "int"},
{"name": "bar", "type": "int"},
{"name": "pluribus", "type": "int"},
{"name": "sugarpercent", "type": "double"},
{"name": "pricepercent", "type": "double"},
{"name": "winpercent", "type": "double"}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Configuration button (gear icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the ValidateRecord processor. The flowfile is now split into two flowfiles:
Looking at the contents of the "invalid" connection shows that there were 3 records (now in JSON) that did not match the schema:
[ {
"competitorname" : "Boston Baked Beans",
"chocolate" : 0,
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 1,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 0,
"bar" : 0,
"pluribus" : 1,
"sugarpercent" : 0.31299999,
"pricepercent" : 0.51099998,
"winpercent" : 23.417824,
"unknown_field_index_13" : "3.14159"
}, {
"competitorname" : "Nestle Butterfinger",
"chocolate" : 1,
"fruity" : 0,
"caramel" : 0,
"peanutyalmondy" : 1,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 0,
"bar" : 1,
"pluribus" : 0,
"sugarpercent" : 0.60399997,
"pricepercent" : 0.76700002,
"winpercent" : "hello"
}, {
"competitorname" : "Strawberry bon bons",
"chocolate" : null,
"fruity" : 1,
"caramel" : 0,
"peanutyalmondy" : 0,
"nougat" : 0,
"crispedricewafer" : 0,
"hard" : 1,
"bar" : 0,
"pluribus" : 1,
"sugarpercent" : 0.56900001,
"pricepercent" : 0.057999998,
"winpercent" : 34.578991
} ]
To see why these records were invalid, select the Provenance icon:
Then select the View Details button ("i" icon) for the ROUTE event:
On the Provenance Event window that opens, scroll down to the bottom so see the Details:
You can look at the contents of the "valid" connection to see the records that did match the schema. A total of 82 records were valid, which is displayed by the record.count attribute:
In preparation for the next article of the tutorial, start the two LogAttribute processors to clear the connection queues. Then stop all processors.
... View more
Labels:
10-23-2017
07:01 PM
2 Kudos
Objective
This tutorial demonstrates how to use the
QueryDatabaseTable and PutKudu processors to read data from a MySQL database and put into Kudu. Thanks to @Cam Mach for his assistance with this article.
Note: The PutKudu processor was introduced in NiFi 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0
Apache Kudu 1.5.0
MySQL 5.7.13 PutKudu (AvroReader) Demo Configuration MySQL Setup
In your MySQL instance, choose a database ("nifi_db" in my instance) and create the table "users":
unix> mysql -u root -p
unix> Enter password:<enter>
mysql> use nifi_db;
mysql>CREATE TABLE `users` (
`id` mediumint(9) NOT NULL AUTO_INCREMENT,
`title` text,
`first_name` text,
`last_name` text,
`street` text,
`city` text,
`state` text,
`zip` text,
`gender` text,
`email` text,
`username` text,
`password` text,
`phone` text,
`cell` text,
`ssn` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=latin1;
Add data to the "users" table:
mysql>INSERT INTO `users` (`id`, `title`, `first_name`, `last_name`, `street`, `city`, `state`, `zip`, `gender`, `email`, `username`, `password`, `phone`, `cell`, `ssn`)
VALUES (1, 'miss', 'marlene', 'shaw', '3450 w belt line rd', 'abilene', 'florida', '31995', 'F', 'marlene.shaw75@example.com', 'goldenpanda70', 'naughty', '(176)-908-6931', '(711)-565-2194', '800-71-1872'),
(2, 'ms', 'letitia', 'jordan', '2974 mockingbird hill', 'irvine', 'new jersey', '64361', 'F', 'letitia.jordan64@example.com', 'lazytiger614', 'aaaaa1', '(860)-602-3314', '(724)-685-3472', '548-93-7031'),
(3, 'mr', 'todd', 'graham', '5760 spring hill rd', 'garden grove', 'north carolina', '81790', 'M', 'todd.graham39@example.com', 'purplekoala484', 'paintball', '(230)-874-6532', '(186)-529-4912', '362-31-5248'),
(4, 'mr', 'seth', 'martinez', '4377 fincher rd', 'chandler', 'south carolina', '73651', 'M', 'seth.martinez82@example.com', 'bigbutterfly149', 'navy', '(122)-782-5822', '(720)-778-8541', '200-80-9087'),
(5, 'mr', 'guy', 'mckinney', '4524 hogan st', 'iowa park', 'ohio', '24140', 'M', 'guy.mckinney53@example.com', 'blueduck623', 'office', '(309)-556-7859', '(856)-764-9146', '973-37-9077'),
(6, 'ms', 'anna', 'smith', '5047 cackson st', 'rancho cucamonga', 'pennsylvania', '56486', 'F', 'anna.smith74@example.com', 'goldenfish121', 'albion', '(335)-388-7351', '(485)-150-6348', '680-20-6440'),
(7, 'mr', 'johnny', 'johnson', '7250 bruce st', 'gresham', 'new mexico', '83973', 'M', 'johnny.johnson73@example.com', 'crazyduck127', 'toast', '(142)-971-3099', '(991)-131-1582', '683-26-4133'),
(8, 'mrs', 'robin', 'white', '7882 northaven rd', 'orlando', 'connecticut', '40452', 'F', 'robin.white46@example.com', 'whitetiger371', 'elizabeth', '(311)-659-3812', '(689)-468-6420', '960-70-3399'),
(9, 'miss', 'allison', 'williams', '7648 edwards rd', 'edison', 'louisiana', '52040', 'F', 'allison.williams82@example.com', 'beautifulfish354', 'sanfran', '(328)-592-3520', '(550)-172-4018', '164-78-8160');
Kudu Setup
For my setup, I followed the
Apache Kudu Quickstart instructions to easily set up and run a Kudu VM.
To check that your VM is running:
unix> VBoxManage list runningvms
"kudu-demo" {b39279b5-3dd6-478a-ac9d-2204bf88e7b9}
To see what IP Kudu is running on:
unix> VBoxManage guestproperty get kudu-demo /VirtualBox/GuestInfo/Net/0/V4/IP
Value: 192.168.58.100
The Kudu web client runs on port 8051:
Create a table in Kudu by first connecting to Impala in the virtual machine:
unix> ssh demo@quickstart.cloudera -t impala-shell
demo@quickstart.cloudera's password:
[quickstart.cloudera:21000] >
(
Note: The username and password for the Quickstart VM is "demo".)
Create the Kudu table with the same columns and data types as the MySQL table:
[quickstart.cloudera:21000] > CREATE TABLE users_kudu
(
id BIGINT,
title STRING,
first_name STRING,
last_name STRING,
street STRING,
city STRING,
state STRING,
zip STRING,
gender STRING,
email STRING,
username STRING,
password STRING,
cell STRING,
ssn STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
NiFi Flow Setup
Follow the following detailed instructions to set up the flow. Alternatively, a template of the flow can be downloaded here: putkudu-querydatabasetable.xml
1. Start NiFi. Two controller services are needed for the flow. Click the "Configuration" button (gear icon) from the Operate palette:
This opens the NiFi Flow Configuration window. Select the "Controller Services" tab. Click the "+" button and add a DBCPConnectionPool controller service:
Configure the controller service as follows (adjusting the property values to match your own MySQL instance and environment):
Next, add an AvroReader controller service:
Apply the default configuration:
Select the "lightning bolt" icon for each controller service to enable them:
2. Return to the NiFi canvas. Add a QueryDatabaseTable processor:
Configure the processor as follows:
where:
The DBCPConnectionPool controller service created earlier is selected for Database Connection Pooling Service
"users" is entered for the Table Name
"id" is entered for the Maximum-value Columns
3. Add a PutKudu processor and connect the two processors:
Configure the PuKudu processor as follows:
where:
"192.168.58.100:7051" is entered for the Kudu Masters IP and port (7051 is the default port)
"impala::default.users_kudu" is entered for the Table Name
Skip head line property is set to "false"
The AvroReader controller service created earlier is selected for Record Reader
Auto-terminate the Success relationship:
On the canvas, make a "failure" relationship connection from the PutKudu processor to itself:
4. The flow is ready to run. Run Flow
Start the QueryDatabaseTable processor.
Looking at the contents of the FlowFile in the queue, the data from the MySQL table has been ingested and converted to Avro format:
Start the PutKudu processor to put the data into Kudu:
This can be confirmed via a Select query:
With the flow still running, add another row of data to the Mysql "users" table:
The flow processes this data and the new row appears in Kudu:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader)
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
Geo Enrich NiFi Provenance Event Data using LookupRecord
Using PutMongoRecord to put CSV into MongoDB
... View more
Labels:
10-16-2017
03:48 PM
@Casel Chen, Thanks for your response. Hoping your original issue is resolved if you setup the Distributed Map Cache Server as detailed in the first article of this tutorial. My flow template (attached with the tutorial) has the Distributed Map Cache Client controller service. PutDatabaseRecord does not support Hive or HBase. For Hive, use a PutHiveStreaming processor instead. PutHiveStreaming expects Avro format. You can use the ConvertRecord processor to convert to Avro.
... View more
10-13-2017
03:41 PM
Hi @Casel Chen, Just to confirm: Did you use a Distributed Map Cache Client/Server as discussed in the first article? There is a Jira that is possibly related to what you are seeing if not: https://issues.apache.org/jira/browse/NIFI-3902 Also, are you using a MySQL? Some issues have been reported if using MariaDB. Thanks!
... View more
10-13-2017
03:34 PM
3 Kudos
Objective
This tutorial demonstrates how to use the
PutMongoRecord processor to easily put data from a CSV file into MongoDB.
Note: The PutMongoRecord processor was introduced in NiFi 1.4.0. As such, the tutorial needs to be done running Version 1.4.0. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.4.0
MongoDB 3.4.9 PutMongoRecord (CSVReader) Demo Configuration MongoDB
For my environment, I had a local MongoDB 3.4.9 instance installed.
Start MongoDB and create a database "hcc" and a collection "movies" for use with this tutorial:
>use hcc
switched to db hcc
> db.createCollection("movies")
{ "ok" : 1 }
> show collections
movies
I like to use
Robot 3T to manage/monitor my MongoDB instance: Initial Flow
A powerful feature about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.
For example, let's assume you have the flow working from the article
"Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)" with all of the necesary controller services enabled.
(Note: The template for this flow can be found in the article as well as step-by-step instructions on how to configure it.)
As currently configured, the flow:
1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.
2. Unzips the file.
3. Sends only the movie title information on in the flow.
4. Adds Schema Name "movies" as an attribute to the flowfile.
5. Uses
PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.
Instead of publishing that movie data to Kafka, we now want to put it in
MongoDB. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutMongoRecord processor and re-using the CSVReader that references an Avro Schema Registry where the movies schema is defined. PutMongoRecord Flow Setup
1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.
2. Add a
PutMongoRecord to the canvas.
3. Connect the UpdateAttribute processor to the PutMongoRecord processor:
4. Open the Configure dialog for the PutMongoRecord process. On the Settings tab, auto-terminate the "success" relationship.
5. On the canvas, make a "failure" relationship connection from the PutMongoRecord to itself.
6. On the Properties tab:
Add "mongodb://localhost:27017" for the Mongo URI property
Add "hcc" for the Mongo Database Name property
Add "movies" for the Mongo Collection Name property
Since it and related schema were already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.
Select "Apply".
The flow is ready to run. Flow Results
Start the flow.
(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)
The movie data is now in MongoDB: Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader) in Apache NiFi 1.2+
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files (Apache NiFi 1.2+)
Geo Enrich NiFi Provenance Event Data using LookupRecord
... View more
Labels:
10-04-2017
03:11 PM
3 Kudos
Objective
This tutorial is the second article of a two part series. We will walk-through in detail a flow which:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
The first article can be found
here.
Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1
NiFi LookupRecord Flow Walk Through
Prerequisites
The first article in this tutorial series has been completed.
The file GeoLite2-City.mmdb has been downloaded locally into a directory named "enrichment-db" within your NiFi installation
Provenance data has been generated and received by the "Provenance In" input port.
All necessary controller services and reporting tasks are enabled/running.
Kafka is running and a topic "California" has been created.
Flow Overview
Here is a quick overview of the flow:
1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask
2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile
3. LookupRecord adds geo enrichment data to the flowfile
4. PartitionRecord groups the records by State
5. RouteOnAttribute filters records to those that originate from California
6. PublishKafka_0_10 publishes the California records to Kafka
Flow Details
UpdateAttribute Processor
Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:
Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:
Geo Enrich (LookupRecord Processor)
The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:
Here is a breakdown of what the processor is configured to do:
Record Reader is set to "JsonTreeReader" and Record Writer is set to "JsonRecordSetWriter". The "JsonTreeReader" controller service parses the event data in JSON format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
The custom property "ip" uses regex to extract the hostname from the "transitUri" field in the record.
LookupService is set to the "IPLookupService" controller service. The ip/hostnames from the previous step are looked up in the MaxMind database to determine if enrichment data exists for that hostname.
Result Recordpath is set to "/enrichment", which is the field added to the record if enrichment data is returned from the Lookup Service.
Routing Strategy is set to "Route to 'matched' or 'unmatched'". Records are routed to these relationships depending on whether or not there was a match in the IPLookupService.
Let's look at the mentioned controller services in detail.
JsonTreeReader Controller Service
Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service.
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"namespace": "nifi",
"name": "provenanceEvent",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" },
{ "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "previousAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "actorHostname", "type": "string" },
{ "name": "contentURI", "type": "string" },
{ "name": "previousContentURI", "type": "string" },
{ "name": "parentIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "childIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] },
{ "name": "remoteHost", "type": ["null", "string"] },
{ "name": "enrichment", "type": ["null",
{
"name": "enrichmentRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "geo",
"type": ["null", {
"name": "cityGeo",
"type": "record",
"fields": [
{ "name": "city", "type": ["null", "string"] },
{ "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
{ "name": "metroCode", "type": ["null", "int"] },
{ "name": "timeZone", "type": ["null", "string"] },
{ "name": "latitude", "type": ["null", "double"] },
{ "name": "longitude", "type": ["null", "double"] },
{ "name": "country", "type": ["null", {
"type": "record",
"name": "country",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}] },
{ "name": "subdivisions", "type": {
"type": "array",
"items": {
"type": "record",
"name": "subdivision",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}
}
},
{ "name": "continent", "type": ["null", "string"] },
{ "name": "postalCode", "type": ["null", "string"] }
]
}]
},
{
"name": "isp",
"type": ["null", {
"name": "ispEnrich",
"type": "record",
"fields": [
{ "name": "name", "type": ["null", "string"] },
{ "name": "organization", "type": ["null", "string"] },
{ "name": "asn", "type": ["null", "int"] },
{ "name": "asnOrganization", "type": ["null", "string"] }
]
}]
},
{
"name": "domainName",
"type": ["null", "string"]
},
{
"name": "connectionType",
"type": ["null", "string"],
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
},
{
"name": "anonymousIp",
"type": ["null", {
"name": "anonymousIpType",
"type": "record",
"fields": [
{ "name": "anonymous", "type": "boolean" },
{ "name": "anonymousVpn", "type": "boolean" },
{ "name": "hostingProvider", "type": "boolean" },
{ "name": "publicProxy", "type": "boolean" },
{ "name": "torExitNode", "type": "boolean" }
]
}]
}
]
}
]}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
IPLookupService Controller Service
Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:
This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the
first tutorial article to look for enrichment data matching the given ip/hostname.
Start the LookupRecord processor:
Since we are only interested in matching records, we connect the "matched" relationship to the next processor.
Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:
For this flowfile, the enrichment data shows California (www.google.com). Selecting another shows Massachusetts (www.toyota.jp):
Partition by State (PartitionRecord Processor)
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.
A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.
Start the PartitionRecord processor:
Check if California (RouteOnAttribute Processor)
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes flowfiles if state is CA.
Run the RouteOnAttributeProcessor to see this in action:
PublishKafka_0_10 Processor
The final processor in the flow is PublishKafka_0_10. Looking at the properties:
Run the processor to see the records published in the Kafka topic "California":
I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader) in Apache NiFi 1.2+
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files (Apache NiFi 1.2+)
... View more
Labels: