Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial consists of two articles. The first walks you through a NiFI flow that utilizes the ValidateRecord processor and Record Reader/Writer controller services to:

  • Convert a CVS file into JSON format
  • Validate the data against a given schema
  • Write the JSON data to either a 'valid' relationship or 'invalid' relationship

The second article, modifies the flow to demonstrated the effects of enabling/disabling the "Strict Type Checking" property of the ValidateRecord processor.

Note: The ValidateRecord processor was introduced in NiFi 1.4.0.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.4.0

ValidateRecord Flow

Support Files

Here is a template of the flow discussed in this tutorial: validaterecord.xml

In the spirit of Halloween last week, let's ingest some data that ranks the most popular candies: candy-data-dirty.txt (Change the extension from .txt to .csv after downloading)

Note: The CSV data originated from the site, Kaggle. Values in the data were then added/modified/deleted for the purposes of this tutorial.

Demo Configuration

Input Directory

Create a local input directory. Place the "candy-data-dirty.csv" file in the input directory.

43466-1-input-directory.png

Import Template

Start NiFi. Import the provided template and add it to the canvas:

43467-2-template-import.png

Enable Controller Services

Select the gear icon from the Operate Palette:

43468-3-flow-configuration.png

This opens the NiFi Flow Configuration window. Select the Controller Services tab:

43469-4-controller-services-disabled.png

Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:

43471-5-controller-services-enabled.png

Update Directory Path in GetFile Processor

Change the Input Directory path in the GetFile processor to point to your local input directory:

43472-6-getfile-directory.png

The flow is now ready to run.

Flow Overview

Here is a quick overview of the flow:

1. GetFile ingests a CSV file of candy data from a local directory

2. UpdateAttribute adds Schema Name "candy" as an attribute to the flowfile

3. ValidateRecord converts the flowfile contents from CSV to JSON by:

  • Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
  • The AvroSchemaRegistry contains a "candy" schema which defines information about each record (field names, field ids, field types)
  • Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema

4. ValidateRecord then writes the data to:

  • The "valid" relationship for records that adhere to the schema
  • The "invalid" relationship for records that do not adhere to the schema

5. Both valid and invalid data are sent to LogAttribute processors in order to investigate the contents and provenance of the queued flowfiles.

Flow Details

Let's look at each of the processors in the flow in detail:

Get Data (GetFile Processor)

FlowFiles are generated from the candy-data-dirty.csv file in the local directory. Start the processor:

43473-7-getfile-start.png

One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List queue" and click the View Details button ("i" icon):

43474-8-listqueue-details.png

From the FlowFile window that opens, select the "View" button from the Details tab:

43475-9-flowfile-view.png

to view the CSV contents of the flowfile:

43476-10-flowfile-contents.png

Add Schema Name Attribute (UpdateAttribute Processor)

The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "candy" to the flowfile:

43477-11-updateattribute-users-schema.png

Start the processor and view the attributes of the flowfile to confirm this:

43478-12-flowfile-schema-name-attribute.png

ValidateRecord Processor

The next processor, ValidateRecord, creates record objects from the flowfile. Then it reads the records one at a time and validates each against the given schema. The records are then written to either a 'valid' relationship or a 'invalid' relationship.

Looking at the configuration:

43479-13-validaterecord-properties.png

Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. Schema Registry is set to "AvroSchemaRegistry". More details about these controller services can be found below.

Allow Extra Fields property is set to "false" to demonstrate how records that have additional fields can be sent to the "invalid" relationship. Strict Type Checking is set to "true", to demonstrate how data with incorrect field types can be routed to the "invalid" relationship (described in more detail in the second article).

CSVReader Controller Service

Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Configuration button (gear icon) to see the properties:

43480-14-csvreader-properties.png

With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the "candy" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Configuration button (gear icon) to see its properties:

43481-15-avroschemaregistry-properties.png

The candy schema is as follows:

{
  "type": "record",
  "name": "CandyRecord",
  "fields" : [
    {"name": "competitorname", "type": "string"},
    {"name": "chocolate", "type": "int"},
    {"name": "fruity", "type": "int"},
    {"name": "caramel", "type": "int"},
    {"name": "peanutyalmondy", "type": "int"},
    {"name": "nougat", "type": "int"},
    {"name": "crispedricewafer", "type": "int"},
    {"name": "hard", "type": "int"},
    {"name": "bar", "type": "int"},
    {"name": "pluribus", "type": "int"},
    {"name": "sugarpercent", "type": "double"},
    {"name": "pricepercent", "type": "double"},
    {"name": "winpercent", "type": "double"}
  ]
}

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Configuration button (gear icon) next to the "JsonRecordSetWriter" controller service to see its properties:

43482-16-jsonrecordsetwriter-properties.png

Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

Start the ValidateRecord processor. The flowfile is now split into two flowfiles:

43483-17-validaterecord-start.png

Looking at the contents of the "invalid" connection shows that there were 3 records (now in JSON) that did not match the schema:

[ {
  "competitorname" : "Boston Baked Beans",
  "chocolate" : 0,
  "fruity" : 0,
  "caramel" : 0,
  "peanutyalmondy" : 1,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 0,
  "bar" : 0,
  "pluribus" : 1,
  "sugarpercent" : 0.31299999,
  "pricepercent" : 0.51099998,
  "winpercent" : 23.417824,
  "unknown_field_index_13" : "3.14159"
}, {
  "competitorname" : "Nestle Butterfinger",
  "chocolate" : 1,
  "fruity" : 0,
  "caramel" : 0,
  "peanutyalmondy" : 1,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 0,
  "bar" : 1,
  "pluribus" : 0,
  "sugarpercent" : 0.60399997,
  "pricepercent" : 0.76700002,
  "winpercent" : "hello"
}, {
  "competitorname" : "Strawberry bon bons",
  "chocolate" : null,
  "fruity" : 1,
  "caramel" : 0,
  "peanutyalmondy" : 0,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 1,
  "bar" : 0,
  "pluribus" : 1,
  "sugarpercent" : 0.56900001,
  "pricepercent" : 0.057999998,
  "winpercent" : 34.578991
} ]

To see why these records were invalid, select the Provenance icon:

43484-18-invalid-provenance.png

Then select the View Details button ("i" icon) for the ROUTE event:

43485-19-detail-route-event.png

On the Provenance Event window that opens, scroll down to the bottom so see the Details:

43486-20-invalid-provenance-details.png

You can look at the contents of the "valid" connection to see the records that did match the schema. A total of 82 records were valid, which is displayed by the record.count attribute:

43487-21-record-count-valid.png

In preparation for the next article of the tutorial, start the two LogAttribute processors to clear the connection queues. Then stop all processors.

20,415 Views
Comments

How can i convert some of the fields based on this validation?. Can i perform validation based on a particular data type alone?. Thanks