Member since
04-05-2016
139
Posts
144
Kudos Received
16
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 50689 | 02-14-2019 02:53 PM |
10-04-2017
01:52 PM
4 Kudos
Objective
This tutorial walks you through a NiFi flow that:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
This article is the first of a two part series. We will setup the demo environment including flows, controller services and reporting tasks. The
second article will walk through the main flow step by step. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1 Environment Configuration Kafka Setup
In the
bin directory of your Kafka install:
Start ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
Start Kafka:
./kafka-server-start.sh ../config/server.properties
Create Kafka Topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic California
Start Kafka Consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic California --from-beginning NiFi Instances Configuration
For this tutorial, you need two NiFi instances running. One instance generates and sends provenance data to the other via the SiteToSiteProvenanceReportingTask.
Instructions on how to setup both instances can be found in the HCC article
"Extracting NiFi Provenance Data using SiteToSiteProvenanceReportingTask". Main Dataflow Instance Setup
In the instance that will use the provenance data (http://localhost:8088/nifi), import the following template:
lookuprecord-geoenrich.xml
You should see the following flow on your NiFi canvas:
First, let's get the
MaxMind Database file that is used to enrich the data. This is done by the flow contained within the "Gather Enrichment Data" process group.
Run the flow and the file GeoLite2-City.mmdb should be downloaded locally into a directory named "enrichment-db" within your NiFi installation.
Now, let's enable the flow's controller services. Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the JsonTreeReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services as well as the IPLookupService controller service. All the controller services should be enabled at this point:
We will step through the main flow in detail in the second article. For now, start only the "Provenance In" input port.
Provenance Event Generating Instance
In the instance that generates provenance data (http://localhost:8080/nifi), import the following template:
fetchsites.xml
The following flow should be on the NiFi canvas:
The two GetHTTP processors are configured as follows:
The UpdateAttribute processor is configured with all default settings:
Now, let's create the SiteToSiteProvenance reporting task.
Select the Global menu and choose "Controller Settings":
Select the Reporting Tasks tab and click the "+" icon:
Select SiteToSiteProvenanceReportingTask and click the "Add" button:
Configure the reporting task as follows:
On the Settings tab, set "Run Schedule" to 5 seconds:
(Note: Some of these settings are for demo purposes only and may need to be adjusted if run in a production environment.)
Start the reporting task:
Return to the NiFi canvas and start the flow to generate provenance data:
Run the flow for 30 secs or so to generate sufficient provenance events. Stop the flow.
Switch to your other NiFi instance. You should see flowfiles queued after the "Provenance In" input port:
We are now ready to geo enrich the provenance data.
Continue to the second article for a detailed walk through of the LookupRecord flow.
... View more
Labels:
09-25-2017
06:25 PM
Objective
This tutorial demonstrates how to upgrade HDF from version 3.0.0 to 3.0.1.1 via the Ambari-managed Express Upgrade.
Note: Additional documentation for HDF 3.0.1.1 upgrade can be found here. Environment
This tutorial was tested using the following environment and components:
Ambari 2.5.1 HDF 3.0.0 Initial Configuration Prerequisites
This tutorial assumes your HDF cluster is in a healthy state and that all
preqrequisites for Ambari upgrade have been met. Upgrade Steps Register Target Version
1. Select "Manage Ambari" from the Admin tab: 2. Select "Versions" from the Clusters section:
3. Select the "Register Version" button:
4. Select "HDF-3.0" from the listed versions:
5. Enter the proper extension in the Name field, "HDF-3.0.1.1". Remove any OS's that are not relevant for your cluster. For your OS, add the proper HDF-3.0.1.1 base URL. For my CentOS7 enviro, it was: http://public-repo-1.hortonworks.com/HDF/centos7/3.x/updates/3.0.1.1/tars/hdf_ambari_mp/hdf-ambari-mpack-3.0.1.1-5.tar.gz which was found in the
3.0.1.1 Release Notes.
6. Select "Save". HDF-3.0.1.1 is now registered but not yet installed.
Install HDF 3.0.1.1
7. Select "Install on ".
8. The versions being managed are displayed.
9. Select the "Install" button for HDF-3.0.1.1.
10. Select "OK" to confirm installation.
11. Allow for some install processing:
12. Then select "Upgrade":
Upgrade to HDF 3.0.1.1
13. You will be presented with two options: Rolling Upgrade and Express Upgrade. Rolling Upgrade is not selectable.
14. Express Upgrade may have some Checks to resolve. In my environment, there were 2 Required and 1 Warning:
15. Follow the instructions to resolve the required checks. The "Proceed" button can now be selected:
16. When prompted to confirm, select "Yes" to proceed with the upgrade:
17. The upgrade process begins:
18. The upgrade pauses to confirm whether you are ready to finalize the upgrade. When ready, select "Finalize".
19. The upgrade completes.
... View more
Labels:
09-15-2017
06:41 PM
3 Kudos
Objective
This tutorial demonstrates how to use the
PutElasticsearchHttpRecord processor to easily put data from a CSV file into Elasticsearch.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.3.0 Elasticsearch 2.3.3 PublishElasticsearchHttpRecord (CSVReader) Demo Configuration Elasticsearch
For my environment, I had Elasticsearch 2.3.3 installed.
Start Elasticsearch and assign cluster and node names:
./elasticsearch --cluster.name elasticsearch --node.name hcc
I like to use
Elastic HQ to manage/monitor my cluster:
Initial Flow
One of the great things about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.
For example, let's assume you have the flow working from the article
"Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)".
Note: The template for that flow can be found in that article as well as step-by-step instructions on how to configure it.
As currently configured, the flow:
1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.
2. Unzips the file.
3. Sends only the movie title information on in the flow.
4. Adds Schema Name "movies" as an attribute to the flowfile.
5. Uses
PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.
Say instead of publishing that movie data to Kafka, you now want to put it in
Elasticsearch. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutElasticsearchRecord processor and re-using a CSVReader. Elasticsearch Flow Setup
1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.
2. Add a
PutElasticsearchHttpRecord to the canvas.
3. Connect the UpdateAttribute processor to the PutElasticsearchHttpRecord processor:
4. Open the Configure dialog for the PutElasticsearchHttpRecord process. On the Settings tab, auto-terminate the "success" relationship and for the purposes of this demo, auto-terminate the "failure" relationship also.
5. On the canvas, make a "retry" relationship connection from the PutElasticsearchHttpRecord to itself.
6. On the Properties tab:
Add "http://127.0.0.1:9200" for the Elasticsearch URL property Add "movies" for the Index property Add "default" for the Type property Since it and its schema was already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.
The flow is ready to run.
Flow Results
Start the flow.
(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)
The movie data is now in Elasticsearch:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Change Data Capture (CDC) with Apache NiF Convert CSV to JSON, Avro, XML using ConvertRecord Installing a local Hortonworks Registry to use with Apache NiFi Running SQL on FlowFiles using QueryRecord Processor Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
... View more
Labels:
09-11-2017
07:09 PM
2 Kudos
Objective
This tutorial walks you through a NiFi flow that utilizes the
PublishKafkaRecord_0_10 processor to easily convert a CVS file into JSON and then publish to Kafka. The tutorial is based on the blog "Integrating Apache Nifi with Apache Kafka", updated with the more recent record-oriented processors and controller services available in NiFi.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1 PublishKafkaRecord_0_10 (CSV to JSON) Support Files
Here is a template of the flow discussed in this tutorial:
publishkafkarecord.xml Demo Configuration Kafka Download & Install
The flow in this demo utilizes the PublishKafkaRecord_0_10 processor, which as the name implies, utilizes the Kafka 0.10.x Producer API. As a result, a 0.10.x version of Kafka is required for this tutorial. For my environment, I downloaded and installed
Kafka 0.10.2.1 (Scala 2.11). Kafka Configuration & Startup
In the
bin directory of your Kafka install:
Start ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
Start Kafka:
./kafka-server-start.sh ../config/server.properties
Create Kafka Topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Movies
Start Kafka Consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic Movies --from-beginning Import Template
Start NiFi. Import the provided template and add it to the canvas. You should see the following flow on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
1.
GetHTTP pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website
2.
UnpackContent unzips the file
3.
RouteOnAttribute sends just the movie title information on in the flow
4.
UpdateAttribute adds Schema Name "movies" as an attribute to the flowfile
5.
PublishKafkaRecord_0_10:
Converts the flowfile contents from CSV to JSON
Publishes the JSON data to the Kafka topic "Movies" Flow Details
Let's look at each of the processors in the flow in detail:
Get Movie Data (GetHTTP Processor)
This processor pulls a zip file from the website
MovieLens, a movie recommendation service. The dataset (ml-20m.zip) contains 20,000,263 ratings and 465,564 tag applications across 27,278 movies.
Looking at the processor's configuration:
Start the processor to retrieve the file:
Note: On the Scheduling tab, the Runs Schedule is set to 10 minutes instead of the default 0 secs, so the processory only periodically checks to see if the data file has been updated instead of constantly:
Unzip (UnpackContent Processor)
The next processor is UnpackContent which unzips the "ml-20m.zip" file:
Running the processor unzips the file into 7 separate csv files (movies.csv, ratings.csv, tags.csv, links.csv, genome-scores.csv, genome-tags.csv, and README.txt):
RouteOnAttribute Processor
RouteOnAttribute is next. Looking at its configuration:
the processor routes the flowfiles to different connections depending on the file name (movies.csv, ratings.csv, tags.csv).
For the purposes of this demo, we are only interested in publishing the movie title data to Kafka. As such, we make the connection to the next processor (UpdateAttribute) using the "movies" relationship and auto-terminate the others:
Run the RouteOnAttributeProcessor to send only the movie title data:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "movies" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
You can also confirm the contents of the flowfile is in CSV format at this point in the flow:
Publish to "Movies" Topic (PublishKafkaRecord_0_10 Processor)
The final processor is PublishKafkaRecord_0_10. Looking at its configuration:
Kafka Brokers property is set to "localhost:9092" and Topic Name property is set to "Movies". Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the
AvroSchemaRegistry Controller Service which defines the "movies" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"type": "record",
"name": "MoviesRecord",
"fields" : [
{"name": "movieId", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "genres", "type": ["null", "string"]}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
See JSON in Kafka
Start the PublishKafaRecord processor and you will see the JSON movie data in your Kafka Consumer window: Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Change Data Capture (CDC) with Apache NiF
Convert CSV to JSON, Avro, XML using ConvertRecord
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
... View more
Labels:
09-05-2017
06:12 PM
3 Kudos
Objective
Apache NiFi provides the option of starting an embedded ZooKeeper server. However, NiFi can also be configured to run with an external ZooKeeper server. This article describes how to install and configure a 3 host ZooKeeper ensemble to work with a 2 node NiFi cluster. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache ZooKeeper 3.4.6
Apache NiFi 1.3.0 ZooKeeper ZooKeeper Version
The version of ZooKeeper chosen for this tutorial is
Release 3.4.6.
Note: ZooKeeper 3.4.6 is the version supported by the latest and previous versions of Hortonworks HDF as shown in the "Component Availability In HDF" table of the HDF 3.0.1.1 Release Notes. ZooKeeper Download
Go to
http://www.apache.org/dyn/closer.cgi/zookeeper/ to determine the best Apache mirror site to download a stable ZooKeeper distribution. From that mirror site, select the zookeeper-3.4.6 directory and download the zookeeper-3.4.6.tar.gz file.
Unzip the tar.gz file and create 3 copies of the distribution directory, one for each host in the ZooKeeper ensemble. For example:
/zookeeper-1
/zookeeper-2
/zookeeper-3
Note: In this tutorial, we are running multiple servers on the same machine. ZooKeeper Configuration
"zoo.cfg" file
Next we need to create three config files. In the
conf directory of zookeeper-1, create a zoo.cfg file with the following contents:
tickTime=2000
dataDir=/usr/local/zookeeper1
clientPort=2181
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
Because we are running multiple ZooKeeper servers on a single machine, we specified the servername as localhost with unique quorum & leader election ports (i.e. 2888:3888, 2889:3889, 2890:3890) for each server.X.
Create similar
zoo.cfg files in the conf directories of zookeeper-2 and zookeeper-3 with modified values for dataDir and clientPort properties as separate dataDirs and distinct clientPorts are necessary.
"myid" file
Every machine that is part of the ZooKeeper ensemble needs to know about every other machine in the ensemble. As such, we need to attribute a server id to each machine by creating a file named
myid , one for each server, which resides in that server's data directory, as specified by the configuration file parameter dataDir .
For example, create a
myid file in /usr/local/zookeeper1 that consists of a single line with the text "1" and nothing else. Create the other myid files in the /usr/local/zookeeper2 and /usr/local/zookeeper3 directories with the contents of "2" and "3" respectively.
Note: More information about ZooKeper configuration settings can be found in the ZooKeeper Getting Started Guide. ZooKeeper Startup
Start up each ZooKeeper host, by navigating to the /bin directory of each and applying the following command:
./zkServer.sh start
NiFi NiFi Configuration
For a two node NiFi cluster, in each
conf directory modify the following properties in the nifi.properties file:
nifi.state.management.embedded.zookeeper.start=false
nifi.zookeeper.connect.string=localhost:2181,localhost:2182,localhost:2183
The first property configures NiFi to not use its embedded ZooKeeper. As a result, the
zookeeper.properties and state-management.xml files in the conf directory are ignored. The second property must be specified to join the cluster as it lists all the ZooKeeper instances in the ensemble. NiFi Startup
You can now start up each NiFi node. When the UI is available, create or upload a flow that has processors that capture state information. For example, import and setup the flow from the
Change Data Capture (CDC) with Apache NiFi series:
In addition to the other setup steps from the CDC article, since this environment is a cluster, for the CaptureChangeMySQL processor, go to the Scheduling tab on the Configure Processor dialog. Change the Execution setting to "Primary node" from "All nodes":
Run the flow and select "View State" from the CaptureChangeMySQL and/or EnforceOrder processors to verify that state information is managed properly by the external ZooKeeper ensemble:
... View more
Labels:
08-28-2017
08:20 PM
5 Kudos
Objective
This tutorial walks you through a NiFI flow that utilizes the
PartitionRecord processor with GrokReader/JSONWriter controller services to parse the NiFi app log in Grok format, convert to JSON and then group the output by log level (INFO, WARN, ERROR).
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0 PartitionRecord Flow Template
Import the template:
partitionrecord-groktojson.xml
The flow should appear as follows on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the GrokReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
The flow is now ready to run. Flow Overview
Here is a quick overview of the main flow:
1. TailFile tails the nifi-app.log
2. UpdateAttribute adds Schema Name "nifi-logs" as an attribute to the flowfile
3. PartitionRecord:
Uses a GrokReader controller service to parse the log data in Grok format. The GrokReader references the AvroSchemaRegistry controller service.
The AvroSchemaRegistry contains a "nifi-logs" schema which defines information about each record (field names, field ids, field types)
Uses a JsonRecordSetWriter controller service to write the records in JSON format. The JsonRecordSetWriter references the same AvroSchemaRegistry.
Groups the records by log level (INFO, WARN, ERROR)
4. RouteOnAttribute sends the data to different connections based on the log level Flow Details
Generate Warning & Errors
Start the "Generate Warnings & Errors" process group to create sample WARN and ERROR logs.
Set schema.name = nifi-logs (TailFile Processor)
This processor is configured to tail the nifi-app.log file:
Start the processor and let it run until multiple flowfiles are generated:
Check to see that flowfiles were generated for info, warning and error logs. Right click on the connection between the TailFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
On the Details tab, elect the View button:
to see the contents of one of the flowfiles:
(Note: Both the "Generate Warnings & Errors" process group and TailFile processors can be stopped at this point since the sample data needed to demonstrate the flow has been generated.)
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "nifi-logs" to the flowfile:
Start the processor, and view the attributes of one of the flowfiles to confirm this:
PartitionRecord Processor
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
Record Reader is set to "GrokReader" and Record Writer is set to "JsonRecordSetWriter". The "GrokReader" controller service parses the log data in Grok format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. More details about these controller services can be found below.
A custom record path property, log_level, is used to divide the records into groups based on the field ‘level’.
GrokReader Controller Service
Select the arrow icon next to the "GrokReader" which opens the Controller Services list in the NiFi Flow Configuration. "GrokReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service. Grok Expression specifies the format of the log line in Grok format, specifically:
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:class} %{GREEDYDATA:message}
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "nifi-logs" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"type": "record",
"name": "nifi_logs",
"fields": [
{ "name": "timestamp", "type": "string" },
{ "name": "level", "type": "string" },
{ "name": "thread", "type": "string" },
{ "name": "class", "type": "string" },
{ "name": "message", "type": "string" },
{ "name": "stackTrace", "type": "string" }
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the PartitionRecord processor. Looking at the contents of a flowfile, confirm that it only contains logs of one log level. For example, here is a flowfile containing only warnings:
RouteOnAttribute Processor
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes the flowfiles to different connections depending on the log_level (INFO, WARN, ERROR).
Run the RouteOnAttributeProcessor to see this in action:
Helpful Links
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
... View more
Labels:
08-11-2017
02:13 PM
4 Kudos
Objective
This tutorial walks you through a NiFI flow that utilizes the
QueryRecord processor and Record Reader/Writer controller services to convert a CVS file into JSON format and then query the data using SQL.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.3.0 QueryRecord Flow Support Files
Here is a template of the flow discussed in this tutorial:
queryrecord-csvtojson.xml
Here is the CSV file used in the flow:
users.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL. Demo Configuration Input Directory
Create a local input directory. Place the "users.csv" file in the input directory.
Import Template
Start NiFi. Import the provided template and add it to the canvas.
You should see the following flow on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Update Directory Path in GetFile Processor
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of user data from a local directory
2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile
3. QueryRecord converts the flowfile contents from CSV to JSON by:
Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service The AvroSchemaRegistry contains a "users" schema which defines information about each record (field names, field ids, field types) Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema
4. QueryRecord queries the flowfile using SQL:
The "female" property uses SQL to select rows from the data where gender = 'F' The "male" property uses SQL to selects rows from the data where gender = 'M'
5. The female and male user data are sent to UpdateAttribute processors to provide a simple place to hold the data. Flow Details
Let's look at each of the processors in the flow in detail:
Get CSV File (GetFile Processor)
FlowFiles are generated from the users.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Note that there are 10 total users.
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
QueryRecord Processor
The next processor, QueryRecord, allows users to write SQL SELECT statements to run over their data as it streams through the system. Each FlowFile is treated as if it were a database table named FLOWFILE. Multiple SQL queries can be added to the processor. Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. More details about these controller services can be found below.
In order to distinguish the results of each query and route the data appropriately, the name of the property is the name of the Relationship that data matching the query should be routed to. The first added property is named "female" and will include user data where the gender of the user is "F":
SELECT *
FROM FLOWFILE
WHERE gender = 'F'
(NOTE: When entering a value for a property in NiFi, you can use Shift + Enter to insert a newline in your value.)
The second property is named "male" and will include user data where the gender of the user is "M":
SELECT *
FROM FLOWFILE
WHERE gender = 'M'
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the
AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the QueryRecord processor. The flowfile is now split into two flowfiles:
Looking at the contents of the "female" connection confirms the expected results of 6 female users in JSON format:
Looking at the contents of the "male" connection confirms the expected results of 4 male users in JSON format: The SQL queries in the flow are very basic for the purposes of this tutorial. The beauty of the QueryRecord processor is that it supports the SQL for more advanced queries and operations, such as filtering specific columns/rows/fields from your data, renaming those columns/rows/fields, performing calculations and aggregations on the data. Enjoy exploring the capabilities of this new functionality! Helpful Links
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+) Installing a local Hortonworks Registry to use with Apache NiFi Real-Time SQL On Event Streams
... View more
Labels:
08-03-2017
06:16 PM
7 Kudos
Objective
This tutorial walks you through how to install and setup a local Hortonworks Registry to interact with Apache NiFi. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
MySQL 5.7.13
Apache NiFi 1.3.0
Hortonworks Registry 0.2.1
Note: The record-oriented processors and controller services used in the demo flow of this tutorial were in introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Currently, Hortonworks Registry 0.2.1 is the version compatible with NiFi 1.3.0. Environment Configuration Hortonworks Registry Installation
Download the 0.2.1 Registry release:
hortonworks-registry-0.2.1.tar.gz
Extract the tar:
tar xzvf hortonworks-registry-0.2.1.tar.gz
MySQL Database Setup
Login to your MySQL instance and create the schema registry database and necessary users and privileges:
unix> mysql -u root -p
unix> Enter password:<enter>
mysql> create database schema_registry;
mysql> CREATE USER 'registry_user'@'localhost' IDENTIFIED BY 'registry_password';
mysql> GRANT ALL PRIVILEGES ON schema_registry.* TO 'registry_user'@'localhost' WITH GRANT OPTION;
mysql> commit;
Configure registry.yaml
In the conf directory of the Registry, there is an example MySQL yaml file that we can repurpose:
cd hortonworks-registry-0.2.1
cp conf/registry.yaml.mysql.example conf/registry.yaml
Edit the following section in the yaml file to add appropriate database and user settings:
storageProviderConfiguration:
providerClass: "com.hortonworks.registries.storage.impl.jdbc.JdbcStorageManager"
properties:
db.type: "mysql"
queryTimeoutInSecs: 30
db.properties:
dataSourceClassName: "org.mariadb.jdbc.MariaDbDataSource"
dataSource.url: "jdbc:mysql://localhost/schema_registry"
dataSource.user: "registry_user"
dataSource.password: "registry_password"
Note: For my environment (with MySQL installed via Homebrew), I did not need to change these default values. Run Bootstrap Scripts
./bootstrap/bootstrap-storage.sh
Start the Registry Server
./bin/registry-server-start.sh ./conf/registry.yaml
Open Registry UI
Navigate to the registry UI in your browser:
http://localhost:9090 Schema Creation
Select the "+" button to add a schema to the registry:
Configure the schema as follows:
The schema text is:
{
"type": "record",
"name": "UserRecord",
"fields" : [
{"name": "id", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "first", "type": ["null", "string"]},
{"name": "last", "type": ["null", "string"]},
{"name": "street", "type": ["null", "string"]},
{"name": "city", "type": ["null", "string"]},
{"name": "state", "type": ["null", "string"]},
{"name": "zip", "type": ["null", "string"]},
{"name": "gender", "type": ["null", "string"]},
{"name": "email", "type": ["null", "string"]},
{"name": "username", "type": ["null", "string"]},
{"name": "password", "type": ["null", "string"]},
{"name": "phone", "type": ["null", "string"]},
{"name": "cell", "type": ["null", "string"]},
{"name": "ssn", "type": ["null", "string"]},
{"name": "date_of_birth", "type": ["null", "string"]},
{"name": "reg_date", "type": ["null", "string"]},
{"name": "large", "type": ["null", "string"]},
{"name": "medium", "type": ["null", "string"]},
{"name": "thumbnail", "type": ["null", "string"]},
{"name": "version", "type": ["null", "string"]},
{"name": "nationality", "type": ["null", "string"]}
]
}
Save the schema:
NiFi Configuration NiFi Template & CSV File
The flow we are going to use for this tutorial is the same one used in the article
Convert CSV to JSON, Avro, XML using ConvertRecord. However, we are going to modify the flow to use a schema in our local Hortonworks Registry instead of a local Avro Schema Registry.
The template can be downloaded here:
convert-csv-to-json.xml
The CSV file used by the flow can be downloaded here:
users.txt
Note: Change the extension from .txt to .csv after downloading. NiFi Flow Configuration Input and Output
Create two local directories. One input directory and one for the JSON output. Place the "users.csv" file in the input directory.
Import Template
Start NiFi. Import the provided template and add it to the canvas:
Update Directory Paths in GetFile and PutFile Processors
Change the Input Directory path in the GetFile processor to point to your local input directory:
Change the Directory path in the PutFile processor to point to your local output directory:
Edit and Enable Controller Services
Now all that remains to run the flow is to modify the schema registry that is used by the record reader and writer controller services. The template is configured to use a local AvroSchemaRegistry controller service. We will change it to use the HortonworksSchemaRegistry.
Select the root process group "NiFi Flow" by clicking an empty area of the canvas. Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab and click the "+" button to create a new controller service.
Select HortonworksSchemaRegistry from the list and click "Add":
Select the Edit button ("pencil" icon) next to the HortonworksSchemaRegistry controller service. Configure it to point to the local Hortonworks Schema Registry instance by adding
http://localhost:9090/api/v1 as the value for the "Schema Registry URL" property:
Select the Edit button ("pencil" icon) next to the CSVReader controller service. Change the "Schema Registry" property value from AvroSchemaRegistry to now point to HortonworksSchemaRegistry:
Select the Edit button ("pencil" icon) next to the JsonRecordSetWriter controller service. Change the "Schema Registry" property value from AvroSchemaRegistry to now point to HortonworksSchemaRegistry:
Enable HortonworksSchemaRegistry controller service by selecting the lightning bolt icon. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the necessary controller services should be enabled at this point:
Note: The AvroSchemaRegistry controller service is no longer used by the flow and can remain disabled. Run the Flow
The flow can now be started:
When run successfully, the JSON formatted file is placed in the local directory we specified earlier in the PutFile processor:
To learn more about the flow with more detailed explanations of the record-oriented processors and controller services in NiFi, see
Convert CSV to JSON, Avro, XML using ConvertRecord.
... View more
Labels:
07-31-2017
05:58 PM
10 Kudos
Objective
This tutorial walks you through a NiFI flow that utilizes the
ConvertRecord processor and Record Reader/Writer controller services to easily convert a CVS file into JSON format. Additionally, the flow is modified to also convert the CSV file to Avro and XML formats.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.3.0 Convert CSV to JSON Support Files
Here is a template of the flow discussed in this tutorial:
convert-csv-to-json.xml
Here is the CSV file used in the flow:
users.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL. Demo Configuration Input and Output
Create two local directories. One input directory and one for the JSON output. Place the "users.csv" file in the input directory.
Import Template
Start NiFi. Import the provided template and add it to the canvas.
You should see the following flow on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Update Directory Path in GetFile Processor
Change the Input Directory path in the GetFile processor to point to your local input directory:
Update Directory Path in PutFile Processor
Change the Directory path in the PutFile processor to point to your local output directory:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of user data from a local directory
2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile
3. ConvertRecord converts the flowfile contents from CSV to JSON by:
Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service The AvroSchemaRegistry contains a "users" schema which defines information about each record (field names, field ids, field types) Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema
4. UpdateAttribute adds the file name with the JSON extension as an attribute to the flowfile
5. PutFile writes the contents of the flowfile to a local directory
Note: Currently, there are two Schema Registry implementations: the local Avro-based Schema Registry controller service utilized in this demo flow and an external client, the Hortonworks Schema Registry.
Flow Details
Let's look at each of the processors in the flow in detail:
Get CSV File (GetFile Processor)
FlowFiles are generated from the users.csv file in the local directory. All of the properties are set to default values, except for Input Directory, which we edited earlier.
Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
ConvertRecord - CSVtoJSON (ConvertRecord Processor)
The next processor is ConvertRecord. Looking at its configuration, there are only two properties:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the
AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the ConvertRecord processor. The contents of the flowfile are now JSON:
Add JSON File Name Extension (UpdateAttribute Processor)
The next processor is another UpdateAttribute, which simply adds a JSON extension to the name of the original CSV file:
Start the processor, and view the details of the flowfile to confirm this:
PutFile Processor
The last processor places the JSON format file in a local directory as described earlier in the configuration section of this article.
Start the processor and confirm the file has been saved:
Convert CSV to Avro & XML
Now that you have a flow that converts CSV files to JSON using the record-oriented processors and controller services, it is very easy to modify the ConvertRecord processor to convert the CSV to other data formats. CSV to Avro: AvroRecordSetWriter
The ConvertRecord processor will continue to use the CSVReader, but now an AvroRecordSetWriter will be used for the Record Writer:
Setting up this controller service is very similar to the JSONRecordSetWriter. Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry:
Running the processor now produces the flowfile with Avro contents:
CSV to XML: ScriptedRecordSetWriter
Again, the ConvertRecord processor will continue to use the CSVReader, but now a ScriptedRecordSetWriter will be used for the Record Writer:
Looking at the properties for this controller service:
The Script Engine is Groovy. The Script Body is:
import groovy.xml.MarkupBuilder
import java.io.IOException
import java.io.InputStream
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
private int recordCount = 0;
private final OutputStream out;
public GroovyRecordSetWriter(final OutputStream out) {
this.out = out;
}
@Override
WriteResult write(Record r) throws IOException {
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
recordCount++;
WriteResult.of(1, [:])
}
@Override
String getMimeType() {
return 'application/xml'
}
@Override
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
count++
record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
}
}
WriteResult.of(count, [:])
}
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:]);
}
@Override
public void close() throws IOException {
}
@Override
public void flush() throws IOException {
}
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override
public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException {
return null;
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter(out)
}
}
writer = new GroovyRecordSetWriterFactory()
Note: The script body was taken from https://github.com/apache/nifi/blob/rel/nifi-1.3.0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
Running the processor now produces the flowfile with XML contents:
Here is a template that converts the CSV file to JSON, Avro and XML:
convert-csv-to-json-avro-xml.xml
Note: After importing this template, make sure the directory paths for the GetFile and PutFile processors exist, confirm users.csv is in the input directory and remember to enable all Controller Services before running the flow. Helpful Links
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
https://community.hortonworks.com/content/kbentry/106450/record-oriented-data-with-nifi.html http://bryanbende.com/development/2017/06/20/apache-nifi-records-and-schema-registries
... View more
Labels:
07-18-2017
02:30 PM
7 Kudos
Objective
This tutorial is the second article of a three part series. We will walk-through a Apache NiFi CDC flow which uses MySQL bin logs to create a copy of a table and keep the copy in sync with row-level changes to the source. The last article in the series will delve into the finer details of the flow configuration, suggest best practices and highlight potential trouble spots. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 MySQL 5.7.13 Apache NiFi 1.3.0 NiFI CDC Flow Walk Through Prerequisites
The first article in this tutorial series has been completed and the finalized CDC flow is on your NiFi canvas.
Your MySQL instance (with binary logging enabled) is running. A 'users' table exists in your "source" database. Your "copy" database is empty. All controller services are enabled. Flow Overview
Here is a quick overview of the CDC flow:
1. CaptureChangeMySQL reads the bin logs to generate FlowFiles (JSON)
2. FlowFiles are routed based on event type:
Begin/Commit events: JSON manipulated by JoltTransformJSON DDL events: schema (QUERY) and statement type (SQL) attributes are added Delete/Insert/Update events: table name, schema (USERS) and statement type attributes are added, JSON manipulated
3. EnforceOrder ensures all change events are processed in the correct order
4. PutDatabaseRecord uses a RecordReader to input multiple records from the incoming flow files. These records are translated to SQL statements and executed as a single batch.
Flow Details
Looking at the flow in detail:
CaptureChangeMySQL Processor
Select the first processor in the flow, CaptureChangeMySQL. Right-click to see the context menu and select "Configuration" to view the Configure Processor window. Select the "Properties" tab.
MySQL Hosts, MySQL Driver Class Name, MySQL Driver Location(s), Username, and Password are configured to point to the "source" MySQL database.
"Server ID" is not specified, which defaults the value to 65535. It should not match the server_id in the my.cnf file that we configured in the first article of this tutorial series.
"Distributed Map Cache Client" is set to the DistributedMapCache controller service "CDC MapCache" to store state information and keep it up to date.
"Include Begin/Commit" Events and "Include DDL Events" are set to "true" for completeness, to demonstrate how the CaptureChangeMySql processor can handle events in the bin log in addition to Delete/Insert/Update events. The DDL events are also needed to ultimately generate the SQL for the `users` table creation in the "copy" database.
Close the Configuration Window. With the processor still highlighted, right-click to see the context menu and select "Start" to run this single processor. 13 FlowFiles should be generated.
The 13 flowfiles are for the 13 events in the bin log: 1 DDL event for the creation of the 'users' table, 1 begin event, 10 insert events for the rows added to the table, and 1 commit event. The contents of these flowfiles can be seen if you right-click on the connection where the 13 flowfiles are queued and select "List queue". Select the View Details button ("i" icon) next to any row to see the details and attributes:
Selecting the "VIEW" button on the Details tab displays the contents of the flowfile. Here are the contents of the DDL event flowfile:
RouteOnAttribute Processor
The next processor in the flow is RouteOnAttribute. Looking at the properties:
this processor routes the flowfiles depending on the event type. Run this processor and we see the expected results of 10 insert event flowfiles sent to the "delete, insert, update" connection, 1 ddl event flowfile sent to the "ddl" connection and 2 total flowfiles (1 begin event, 1 commit event) sent to the "begin, commit" connection.
At this point, the flow has multiple branches. Let's look at the BEGIN/COMMIT and DDL paths of the flow first.
Begin/Commit Path
The begin/commit flowfiles are sent to a JoltTransformJSON processor which uses a Shift Jolt transformation DSL to put the begin/commit events in the "query" field of the JSON:
Here is an example of the JSON from the incoming begin event flowfile:
{
"type" : "begin",
"timestamp" : 1499702324000,
"binlog_filename" : "delta.000001",
"binlog_position" : 1070,
"database" : "source"
}
Then after running the processor:
here is the flattened result:
{"query":"begin"}
DDL Path
For the DDL path, the contents of the DDL flowfile already has a "query" field, so no JSON transformation is needed:
{
"type" : "ddl",
"timestamp" : 1499701819000,
"binlog_filename" : "delta.000001",
"binlog_position" : 384,
"database" : "source",
"table_name" : null,
"table_id" : null,
"query" : "create table `users` (\n `id` mediumint(9) not null auto_increment primary key,\n `title` text,\n `first` text,\n `last` text,\n `street` text,\n `city` text,\n `state` text,\n `zip` text,\n `gender` text,\n `email` text,\n `username` text,\n `password` text,\n `phone` text,\n `cell` text,\n `ssn` text,\n `date_of_birth` timestamp null default null,\n `reg_date` timestamp null default null,\n `large` text,\n `medium` text,\n `thumbnail` text,\n `version` text,\n `nationality` text\n) engine=innodb auto_increment=1 default charset=latin1"
}
Schema=query & StmtType=SQL (UpdateAttribute Processor)
The begin/commit and ddl flowfiles are now both sent to a UpdateAttribute processor which adds the schema name and statement type attributes with the values of "query" and "SQL" respectively.
After starting the processor:
Here is an example of these attributes added to the flowfile:
Delete, Insert, Update Path
Now let's look at the DELETE/INSERT/UPDATE path of the flow.
Get Table Name (EvaluateJsonPath Processor)
This processor adds the attribute tableName to each of the flowfiles:
with a value of "users" from the flowfile contents via expression language.
After starting the processor:
here is an example of the new tableName attribute and value added to one of the flowfiles:
Transform to Flat JSON (JoltTransformJSON Processor)
The next processor, JoltTransformJSON, simplifies and formats the JSON in the flowfiles in order for these insert SQL statements to be applied to the `users` table in the copy db (applied by the PutDBRecord processor later in the flow). Looking at the configuration for this processor:
As shown, a Chain Jolt transformation DSL is used in conjunction with a Jolt "shift" specification to perform the flattening.
Here is an example of the JSON from an incoming flowfile:
{
"type" : "insert",
"timestamp" : 1499702324000,
"binlog_filename" : "delta.000001",
"binlog_position" : 1246,
"database" : "source",
"table_name" : "users",
"table_id" : 108,
"columns" : [ {
"id" : 1,
"name" : "id",
"column_type" : 4,
"value" : 1
}, {
"id" : 2,
"name" : "title",
"column_type" : -1,
"value" : "miss"
}, {
"id" : 3,
"name" : "first",
"column_type" : -1,
"value" : "marlene"
}, {
"id" : 4,
"name" : "last",
"column_type" : -1,
"value" : "shaw"
}, {
"id" : 5,
"name" : "street",
"column_type" : -1,
"value" : "3450 w belt line rd"
}, {
"id" : 6,
"name" : "city",
"column_type" : -1,
"value" : "abilene"
}, {
"id" : 7,
"name" : "state",
"column_type" : -1,
"value" : "florida"
}, {
"id" : 8,
"name" : "zip",
"column_type" : -1,
"value" : "31995"
}, {
"id" : 9,
"name" : "gender",
"column_type" : -1,
"value" : "F"
}, {
"id" : 10,
"name" : "email",
"column_type" : -1,
"value" : "marlene.shaw75@example.com"
}, {
"id" : 11,
"name" : "username",
"column_type" : -1,
"value" : "goldenpanda70"
}, {
"id" : 12,
"name" : "password",
"column_type" : -1,
"value" : "naughty"
}, {
"id" : 13,
"name" : "phone",
"column_type" : -1,
"value" : "(176)-908-6931"
}, {
"id" : 14,
"name" : "cell",
"column_type" : -1,
"value" : "(711)-565-2194"
}, {
"id" : 15,
"name" : "ssn",
"column_type" : -1,
"value" : "800-71-1872"
}, {
"id" : 16,
"name" : "date_of_birth",
"column_type" : 93,
"value" : "1991-10-07 00:22:53.0"
}, {
"id" : 17,
"name" : "reg_date",
"column_type" : 93,
"value" : "2004-01-29 16:19:10.0"
}, {
"id" : 18,
"name" : "large",
"column_type" : -1,
"value" : "http://api.randomuser.me/portraits/women/67.jpg"
}, {
"id" : 19,
"name" : "medium",
"column_type" : -1,
"value" : "http://api.randomuser.me/portraits/med/women/67.jpg"
}, {
"id" : 20,
"name" : "thumbnail",
"column_type" : -1,
"value" : "http://api.randomuser.me/portraits/thumb/women/67.jpg"
}, {
"id" : 21,
"name" : "version",
"column_type" : -1,
"value" : "0.6"
}, {
"id" : 22,
"name" : "nationality",
"column_type" : -1,
"value" : "US"
} ]
}
Then after running the processor:
here is the flattened result:
[ {
"id" : 1,
"title" : "miss",
"first" : "marlene",
"last" : "shaw",
"street" : "3450 w belt line rd",
"city" : "abilene",
"state" : "florida",
"zip" : "31995",
"gender" : "F",
"email" : "marlene.shaw75@example.com",
"username" : "goldenpanda70",
"password" : "naughty",
"phone" : "(176)-908-6931",
"cell" : "(711)-565-2194",
"ssn" : "800-71-1872",
"date_of_birth" : "1991-10-07 00:22:53.0",
"reg_date" : "2004-01-29 16:19:10.0",
"large" : "http://api.randomuser.me/portraits/women/67.jpg",
"medium" : "http://api.randomuser.me/portraits/med/women/67.jpg",
"thumbnail" : "http://api.randomuser.me/portraits/thumb/women/67.jpg",
"version" : "0.6",
"nationality" : "US"
} ]
Schema=users & Statement Type (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema name and statement type attributes to the flowfiles.
After starting the processor:
Here is an example of these attributes added to one of the insert flowfiles:
EnforceOrder Processor
The two flow paths converge at an EnforceOrder processor:
This processor guarantees order delivery of the CDC events based on sequence id, which were generated by the CaptureChangeMySQL processor at the beginning of the flow (cdc.sequence.id attribute).
The connection that follows EnforceOrder, is configured to prioritize flowfiles by "first in, first out":
Running the processor results in the following:
Note: While the number of files that are queued after the processor is 13 as expected, 24 total flowfiles were processed In/Out. This is because some flowfiles went through the "wait" connection. This is confirmed if you right-click on the wait connection and select "Status History":
PutDatabaseRecord Processor
With the order of the CDC events ensured, the next and final processor is PutDatabaseRecord. Looking at it's configuration:
the Record Reader property points to the "JsonPathReader" controller service, which parses the incoming data and determines the data's schema (more on this later). Statement Type property specifies the type of SQL statement to generate. In this flow it is set to "Use statement.type Attribute", which is necessary for the flow to handle more than individual INSERT, UPDATE, DELETE types. In conjunction with the Field Containing SQL property set to "query", the processor: 1) looks at the statement.type attribute from each Delete/Insert/Update flowfile to apply the appropriate delete/insert/update SQL and 2) assumes the value of the "query" field is valid SQL and applies it as-is for the Begin/Commit/DDL flow files. Table Name property is set to the "tableName" attribute via Expression Language, which is "user" for the flowfiles. Database Connection Pooling Service is set to the MYSQL CDC Backup controller service. This DBCPConnectionPool controller service points to the target “copy” database:
JsonPathReader Record Reader
As mentioned earlier, the Record Reader used in the processor is the "JsonPathReader" controller service. JsonPathReader dynamically reads JSON files that have any schema. The reader specifies the schema expected in an attribute, which in this flow is schema.name.
Schema Registry property is set to the AvroSchemaRegistry controller service. AvroSchemaRegistry which defines the "query" and "users" schemas.
Note: For more information on Record Readers/Writers, see https://community.hortonworks.com/content/kbentry/106450/record-oriented-data-with-nifi.html.
Starting the PutDatabaseRecord processor should process 13 flowfiles going to Success.
As a result, a `user` table now exists in the "copy" database which is equivalent to the original `user` table in the "source" database. Table Synchronization
With all of the processors in the flow running (except for the LogAttribute ones), any row-level changes to the original `users` table in the source database will be applied in real-time to the `users` table in the copy database.
Delete, add or modify some rows and you will see these change events processed by the flow to keep the tables in sync. Review
This tutorial walked you through a sample NiFi CDC flow, examining each component in the flow in detail including the critical processors CaptureChangeMySQL, EnforceOrder and PutDatabaseRecord. Running the flow, created a `user` table in the copy database that is identical to the original `user` table in the source database. With these processors in the flow running, any changes in the original `user` table are also applied to the replicated `user` table. Continue to the third article in this tutorial series to delve into more details of the flow configuration and learn about best practices and potential trouble spots.
... View more
Labels:
- « Previous
- Next »