Member since
04-05-2016
139
Posts
143
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
33216 | 02-14-2019 02:53 PM | |
2533 | 01-04-2019 08:39 PM | |
10801 | 11-05-2018 03:38 PM | |
5077 | 09-27-2018 04:21 PM | |
2751 | 07-05-2018 02:56 PM |
10-20-2017
02:54 PM
@Andrew Lim thanks for clarifying further.
... View more
01-29-2018
06:53 PM
Hi Andrew, I have used PutMongoRecord for nested JSON object, the moment my JSON has nested structure it fails with an error org.bson.codecs.configuration.CodecConfigurationException and the insertion to mongodb fails. Please find the AvroSchema in the attached image. I could get the JSON validated against the schema correctly. Attached the stack trace for the error as well. Am i missing anything on the AvroSchema Registry config or any other config
... View more
10-04-2017
03:11 PM
3 Kudos
Objective
This tutorial is the second article of a two part series. We will walk-through in detail a flow which:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
The first article can be found
here.
Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1
NiFi LookupRecord Flow Walk Through
Prerequisites
The first article in this tutorial series has been completed.
The file GeoLite2-City.mmdb has been downloaded locally into a directory named "enrichment-db" within your NiFi installation
Provenance data has been generated and received by the "Provenance In" input port.
All necessary controller services and reporting tasks are enabled/running.
Kafka is running and a topic "California" has been created.
Flow Overview
Here is a quick overview of the flow:
1. "Provenance In" input port receives the provenance events from the SiteToSiteProvenanceReportingTask
2. UpdateAttribute adds Schema Name "provenance-event" as an attribute to the flowfile
3. LookupRecord adds geo enrichment data to the flowfile
4. PartitionRecord groups the records by State
5. RouteOnAttribute filters records to those that originate from California
6. PublishKafka_0_10 publishes the California records to Kafka
Flow Details
UpdateAttribute Processor
Once the provenance event data is received by the "Provenance In" input port, the first processor in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "provenance-event" to the flowfile. Looking at its configuration:
Start the UpdateAttribute processor and view the attributes of one of the flowfiles to confirm this:
Geo Enrich (LookupRecord Processor)
The next processor, LookupRecord, looks up geo enrichment data by IP/hostname and adds this information to an "enrichment" field in the flowfile. Looking at the configuration:
Here is a breakdown of what the processor is configured to do:
Record Reader is set to "JsonTreeReader" and Record Writer is set to "JsonRecordSetWriter". The "JsonTreeReader" controller service parses the event data in JSON format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
The custom property "ip" uses regex to extract the hostname from the "transitUri" field in the record.
LookupService is set to the "IPLookupService" controller service. The ip/hostnames from the previous step are looked up in the MaxMind database to determine if enrichment data exists for that hostname.
Result Recordpath is set to "/enrichment", which is the field added to the record if enrichment data is returned from the Lookup Service.
Routing Strategy is set to "Route to 'matched' or 'unmatched'". Records are routed to these relationships depending on whether or not there was a match in the IPLookupService.
Let's look at the mentioned controller services in detail.
JsonTreeReader Controller Service
Select the arrow icon next to the "JsonTreeReader" which opens the Controller Services list in the NiFi Flow Configuration. "JsonTreeReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service.
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "provenance-event" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"namespace": "nifi",
"name": "provenanceEvent",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": {
"type": "long",
"logicalType": "timestamp-millis"
} },
{ "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" },
{ "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "previousAttributes", "type": {
"type": "map",
"values": "string"
} },
{ "name": "actorHostname", "type": "string" },
{ "name": "contentURI", "type": "string" },
{ "name": "previousContentURI", "type": "string" },
{ "name": "parentIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "childIds", "type": {
"type": "array",
"items": "string"
} },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] },
{ "name": "remoteHost", "type": ["null", "string"] },
{ "name": "enrichment", "type": ["null",
{
"name": "enrichmentRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "geo",
"type": ["null", {
"name": "cityGeo",
"type": "record",
"fields": [
{ "name": "city", "type": ["null", "string"] },
{ "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
{ "name": "metroCode", "type": ["null", "int"] },
{ "name": "timeZone", "type": ["null", "string"] },
{ "name": "latitude", "type": ["null", "double"] },
{ "name": "longitude", "type": ["null", "double"] },
{ "name": "country", "type": ["null", {
"type": "record",
"name": "country",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}] },
{ "name": "subdivisions", "type": {
"type": "array",
"items": {
"type": "record",
"name": "subdivision",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}
}
},
{ "name": "continent", "type": ["null", "string"] },
{ "name": "postalCode", "type": ["null", "string"] }
]
}]
},
{
"name": "isp",
"type": ["null", {
"name": "ispEnrich",
"type": "record",
"fields": [
{ "name": "name", "type": ["null", "string"] },
{ "name": "organization", "type": ["null", "string"] },
{ "name": "asn", "type": ["null", "int"] },
{ "name": "asnOrganization", "type": ["null", "string"] }
]
}]
},
{
"name": "domainName",
"type": ["null", "string"]
},
{
"name": "connectionType",
"type": ["null", "string"],
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
},
{
"name": "anonymousIp",
"type": ["null", {
"name": "anonymousIpType",
"type": "record",
"fields": [
{ "name": "anonymous", "type": "boolean" },
{ "name": "anonymousVpn", "type": "boolean" },
{ "name": "hostingProvider", "type": "boolean" },
{ "name": "publicProxy", "type": "boolean" },
{ "name": "torExitNode", "type": "boolean" }
]
}]
}
]
}
]}
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
IPLookupService Controller Service
Close the window for JsonRecordSetWriter. Select the View Details button ("i" icon) next to the "IPLookupService" controller service to see its properties:
This service references the MaxMind Database file "GeoLite2-City.mmb" that we retrieved in the
first tutorial article to look for enrichment data matching the given ip/hostname.
Start the LookupRecord processor:
Since we are only interested in matching records, we connect the "matched" relationship to the next processor.
Looking at the contents of a flowfile, confirm that the "enrichment" field has been added:
For this flowfile, the enrichment data shows California (www.google.com). Selecting another shows Massachusetts (www.toyota.jp):
Partition by State (PartitionRecord Processor)
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
The "JsonTreeReader" and "JsonRecordSetWriter" record reader/writers are reused.
A custom record path property, "state", is used to divide the records into groups based on the field ‘./isoCode’.
Start the PartitionRecord processor:
Check if California (RouteOnAttribute Processor)
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes flowfiles if state is CA.
Run the RouteOnAttributeProcessor to see this in action:
PublishKafka_0_10 Processor
The final processor in the flow is PublishKafka_0_10. Looking at the properties:
Run the processor to see the records published in the Kafka topic "California":
I hope this tutorial was helpful and provided insight into how to use the LookupRecord processor! Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
Using PutElasticsearchHttpRecord (CSVReader) in Apache NiFi 1.2+
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files (Apache NiFi 1.2+)
... View more
Labels:
10-04-2017
01:52 PM
4 Kudos
Objective
This tutorial walks you through a NiFi flow that:
Uses the LookupRecord processor to parse NiFi provenance events in JSON format and add geolocation data
Uses the PartitionRecord processor to group like records by State
Publishes records originating from California to Kafka
This article is the first of a two part series. We will setup the demo environment including flows, controller services and reporting tasks. The
second article will walk through the main flow step by step. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka 0.10.2.1 Environment Configuration Kafka Setup
In the
bin directory of your Kafka install:
Start ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
Start Kafka:
./kafka-server-start.sh ../config/server.properties
Create Kafka Topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic California
Start Kafka Consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic California --from-beginning NiFi Instances Configuration
For this tutorial, you need two NiFi instances running. One instance generates and sends provenance data to the other via the SiteToSiteProvenanceReportingTask.
Instructions on how to setup both instances can be found in the HCC article
"Extracting NiFi Provenance Data using SiteToSiteProvenanceReportingTask". Main Dataflow Instance Setup
In the instance that will use the provenance data (http://localhost:8088/nifi), import the following template:
lookuprecord-geoenrich.xml
You should see the following flow on your NiFi canvas:
First, let's get the
MaxMind Database file that is used to enrich the data. This is done by the flow contained within the "Gather Enrichment Data" process group.
Run the flow and the file GeoLite2-City.mmdb should be downloaded locally into a directory named "enrichment-db" within your NiFi installation.
Now, let's enable the flow's controller services. Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the JsonTreeReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services as well as the IPLookupService controller service. All the controller services should be enabled at this point:
We will step through the main flow in detail in the second article. For now, start only the "Provenance In" input port.
Provenance Event Generating Instance
In the instance that generates provenance data (http://localhost:8080/nifi), import the following template:
fetchsites.xml
The following flow should be on the NiFi canvas:
The two GetHTTP processors are configured as follows:
The UpdateAttribute processor is configured with all default settings:
Now, let's create the SiteToSiteProvenance reporting task.
Select the Global menu and choose "Controller Settings":
Select the Reporting Tasks tab and click the "+" icon:
Select SiteToSiteProvenanceReportingTask and click the "Add" button:
Configure the reporting task as follows:
On the Settings tab, set "Run Schedule" to 5 seconds:
(Note: Some of these settings are for demo purposes only and may need to be adjusted if run in a production environment.)
Start the reporting task:
Return to the NiFi canvas and start the flow to generate provenance data:
Run the flow for 30 secs or so to generate sufficient provenance events. Stop the flow.
Switch to your other NiFi instance. You should see flowfiles queued after the "Provenance In" input port:
We are now ready to geo enrich the provenance data.
Continue to the second article for a detailed walk through of the LookupRecord flow.
... View more
Labels:
09-25-2017
06:25 PM
Objective
This tutorial demonstrates how to upgrade HDF from version 3.0.0 to 3.0.1.1 via the Ambari-managed Express Upgrade.
Note: Additional documentation for HDF 3.0.1.1 upgrade can be found here. Environment
This tutorial was tested using the following environment and components:
Ambari 2.5.1 HDF 3.0.0 Initial Configuration Prerequisites
This tutorial assumes your HDF cluster is in a healthy state and that all
preqrequisites for Ambari upgrade have been met. Upgrade Steps Register Target Version
1. Select "Manage Ambari" from the Admin tab: 2. Select "Versions" from the Clusters section:
3. Select the "Register Version" button:
4. Select "HDF-3.0" from the listed versions:
5. Enter the proper extension in the Name field, "HDF-3.0.1.1". Remove any OS's that are not relevant for your cluster. For your OS, add the proper HDF-3.0.1.1 base URL. For my CentOS7 enviro, it was: http://public-repo-1.hortonworks.com/HDF/centos7/3.x/updates/3.0.1.1/tars/hdf_ambari_mp/hdf-ambari-mpack-3.0.1.1-5.tar.gz which was found in the
3.0.1.1 Release Notes.
6. Select "Save". HDF-3.0.1.1 is now registered but not yet installed.
Install HDF 3.0.1.1
7. Select "Install on ".
8. The versions being managed are displayed.
9. Select the "Install" button for HDF-3.0.1.1.
10. Select "OK" to confirm installation.
11. Allow for some install processing:
12. Then select "Upgrade":
Upgrade to HDF 3.0.1.1
13. You will be presented with two options: Rolling Upgrade and Express Upgrade. Rolling Upgrade is not selectable.
14. Express Upgrade may have some Checks to resolve. In my environment, there were 2 Required and 1 Warning:
15. Follow the instructions to resolve the required checks. The "Proceed" button can now be selected:
16. When prompted to confirm, select "Yes" to proceed with the upgrade:
17. The upgrade process begins:
18. The upgrade pauses to confirm whether you are ready to finalize the upgrade. When ready, select "Finalize".
19. The upgrade completes.
... View more
Labels:
09-22-2017
07:22 PM
We do not have access to the vendor code to see how it is really generated. There are option to generate .xls or .xlsx file. So I believe it should be .xlsx. I do not know what version of excel to they use either. The application is black box
... View more
09-15-2017
06:41 PM
3 Kudos
Objective
This tutorial demonstrates how to use the
PutElasticsearchHttpRecord processor to easily put data from a CSV file into Elasticsearch.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.3.0 Elasticsearch 2.3.3 PublishElasticsearchHttpRecord (CSVReader) Demo Configuration Elasticsearch
For my environment, I had Elasticsearch 2.3.3 installed.
Start Elasticsearch and assign cluster and node names:
./elasticsearch --cluster.name elasticsearch --node.name hcc
I like to use
Elastic HQ to manage/monitor my cluster:
Initial Flow
One of the great things about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.
For example, let's assume you have the flow working from the article
"Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)".
Note: The template for that flow can be found in that article as well as step-by-step instructions on how to configure it.
As currently configured, the flow:
1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.
2. Unzips the file.
3. Sends only the movie title information on in the flow.
4. Adds Schema Name "movies" as an attribute to the flowfile.
5. Uses
PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.
Say instead of publishing that movie data to Kafka, you now want to put it in
Elasticsearch. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutElasticsearchRecord processor and re-using a CSVReader. Elasticsearch Flow Setup
1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.
2. Add a
PutElasticsearchHttpRecord to the canvas.
3. Connect the UpdateAttribute processor to the PutElasticsearchHttpRecord processor:
4. Open the Configure dialog for the PutElasticsearchHttpRecord process. On the Settings tab, auto-terminate the "success" relationship and for the purposes of this demo, auto-terminate the "failure" relationship also.
5. On the canvas, make a "retry" relationship connection from the PutElasticsearchHttpRecord to itself.
6. On the Properties tab:
Add "http://127.0.0.1:9200" for the Elasticsearch URL property Add "movies" for the Index property Add "default" for the Type property Since it and its schema was already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.
The flow is ready to run.
Flow Results
Start the flow.
(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)
The movie data is now in Elasticsearch:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Change Data Capture (CDC) with Apache NiF Convert CSV to JSON, Avro, XML using ConvertRecord Installing a local Hortonworks Registry to use with Apache NiFi Running SQL on FlowFiles using QueryRecord Processor Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
... View more
Labels:
03-12-2018
04:17 AM
@Andrew Lim 1)My requirement is We have CSV data in one of the remote machine. every one hour csv data(huge data) is generating in remote machine. So i used SFTP for that. From there i need to put data to Kafka topics.So i used Publish Kafka record.I have not used Json conversion which is written in above article.But i am going to use it. 2) Yes i got the flow from your article. I have missed schema.name attribute and schema registry bcoz i dont understand what to mention in this.do i need to mention column name which is there in input file. 3) and I am just started learning kafka. and i created partitions in one of topic, i have no idea how to load data to specific partitions using nifi. If you have any suggestion better than this can you guide me ?
... View more
09-11-2017
02:40 PM
Hi, I tried to implement using ConvertRecord Processor and getting below error: SchemaNotFoundException: Flow File did not contain appropriate attributes to determine Schema Name.
... View more
08-28-2017
08:20 PM
5 Kudos
Objective
This tutorial walks you through a NiFI flow that utilizes the
PartitionRecord processor with GrokReader/JSONWriter controller services to parse the NiFi app log in Grok format, convert to JSON and then group the output by log level (INFO, WARN, ERROR).
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0 PartitionRecord Flow Template
Import the template:
partitionrecord-groktojson.xml
The flow should appear as follows on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the GrokReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
The flow is now ready to run. Flow Overview
Here is a quick overview of the main flow:
1. TailFile tails the nifi-app.log
2. UpdateAttribute adds Schema Name "nifi-logs" as an attribute to the flowfile
3. PartitionRecord:
Uses a GrokReader controller service to parse the log data in Grok format. The GrokReader references the AvroSchemaRegistry controller service.
The AvroSchemaRegistry contains a "nifi-logs" schema which defines information about each record (field names, field ids, field types)
Uses a JsonRecordSetWriter controller service to write the records in JSON format. The JsonRecordSetWriter references the same AvroSchemaRegistry.
Groups the records by log level (INFO, WARN, ERROR)
4. RouteOnAttribute sends the data to different connections based on the log level Flow Details
Generate Warning & Errors
Start the "Generate Warnings & Errors" process group to create sample WARN and ERROR logs.
Set schema.name = nifi-logs (TailFile Processor)
This processor is configured to tail the nifi-app.log file:
Start the processor and let it run until multiple flowfiles are generated:
Check to see that flowfiles were generated for info, warning and error logs. Right click on the connection between the TailFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
On the Details tab, elect the View button:
to see the contents of one of the flowfiles:
(Note: Both the "Generate Warnings & Errors" process group and TailFile processors can be stopped at this point since the sample data needed to demonstrate the flow has been generated.)
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "nifi-logs" to the flowfile:
Start the processor, and view the attributes of one of the flowfiles to confirm this:
PartitionRecord Processor
The next processor, PartitionRecord, separates the incoming flowfiles into groups of ‘like’ records by evaluating a user-supplied records path against each record. Looking at the configuration:
Record Reader is set to "GrokReader" and Record Writer is set to "JsonRecordSetWriter". The "GrokReader" controller service parses the log data in Grok format and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. More details about these controller services can be found below.
A custom record path property, log_level, is used to divide the records into groups based on the field ‘level’.
GrokReader Controller Service
Select the arrow icon next to the "GrokReader" which opens the Controller Services list in the NiFi Flow Configuration. "GrokReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this example is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service. Grok Expression specifies the format of the log line in Grok format, specifically:
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:class} %{GREEDYDATA:message}
AvroSchemaRegistry Controller Service
The AvroSchemaRegistry defines the "nifi-logs" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{
"type": "record",
"name": "nifi_logs",
"fields": [
{ "name": "timestamp", "type": "string" },
{ "name": "level", "type": "string" },
{ "name": "thread", "type": "string" },
{ "name": "class", "type": "string" },
{ "name": "message", "type": "string" },
{ "name": "stackTrace", "type": "string" }
]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the PartitionRecord processor. Looking at the contents of a flowfile, confirm that it only contains logs of one log level. For example, here is a flowfile containing only warnings:
RouteOnAttribute Processor
A RouteOnAttribute processor is next in the flow. Looking at the properties:
this processor routes the flowfiles to different connections depending on the log_level (INFO, WARN, ERROR).
Run the RouteOnAttributeProcessor to see this in action:
Helpful Links
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Convert CSV to JSON, Avro, XML using ConvertRecord (Apache NiFi 1.2+)
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor (Apache NiFi 1.2+)
... View more
Labels:
- « Previous
- Next »