Created on 11-07-2017 07:26 PM - edited 08-17-2019 10:15 AM
This tutorial consists of two articles. The first walks you through a NiFI flow that utilizes the ValidateRecord processor and Record Reader/Writer controller services to:
The second article, modifies the flow to demonstrated the effects of enabling/disabling the "Strict Type Checking" property of the ValidateRecord processor.
Note: The ValidateRecord processor was introduced in NiFi 1.4.0.
This tutorial was tested using the following environment and components:
Here is a template of the flow discussed in this tutorial: validaterecord.xml
In the spirit of Halloween last week, let's ingest some data that ranks the most popular candies: candy-data-dirty.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, Kaggle. Values in the data were then added/modified/deleted for the purposes of this tutorial.
Create a local input directory. Place the "candy-data-dirty.csv" file in the input directory.
Start NiFi. Import the provided template and add it to the canvas:
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run.
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of candy data from a local directory
2. UpdateAttribute adds Schema Name "candy" as an attribute to the flowfile
3. ValidateRecord converts the flowfile contents from CSV to JSON by:
4. ValidateRecord then writes the data to:
5. Both valid and invalid data are sent to LogAttribute processors in order to investigate the contents and provenance of the queued flowfiles.
Let's look at each of the processors in the flow in detail:
Get Data (GetFile Processor)
FlowFiles are generated from the candy-data-dirty.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "candy" to the flowfile:
Start the processor and view the attributes of the flowfile to confirm this:
ValidateRecord Processor
The next processor, ValidateRecord, creates record objects from the flowfile. Then it reads the records one at a time and validates each against the given schema. The records are then written to either a 'valid' relationship or a 'invalid' relationship.
Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. Schema Registry is set to "AvroSchemaRegistry". More details about these controller services can be found below.
Allow Extra Fields property is set to "false" to demonstrate how records that have additional fields can be sent to the "invalid" relationship. Strict Type Checking is set to "true", to demonstrate how data with incorrect field types can be routed to the "invalid" relationship (described in more detail in the second article).
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Configuration button (gear icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the "candy" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Configuration button (gear icon) to see its properties:
The candy schema is as follows:
{
  "type": "record",
  "name": "CandyRecord",
  "fields" : [
    {"name": "competitorname", "type": "string"},
    {"name": "chocolate", "type": "int"},
    {"name": "fruity", "type": "int"},
    {"name": "caramel", "type": "int"},
    {"name": "peanutyalmondy", "type": "int"},
    {"name": "nougat", "type": "int"},
    {"name": "crispedricewafer", "type": "int"},
    {"name": "hard", "type": "int"},
    {"name": "bar", "type": "int"},
    {"name": "pluribus", "type": "int"},
    {"name": "sugarpercent", "type": "double"},
    {"name": "pricepercent", "type": "double"},
    {"name": "winpercent", "type": "double"}
  ]
}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Configuration button (gear icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the ValidateRecord processor. The flowfile is now split into two flowfiles:
Looking at the contents of the "invalid" connection shows that there were 3 records (now in JSON) that did not match the schema:
[ {
  "competitorname" : "Boston Baked Beans",
  "chocolate" : 0,
  "fruity" : 0,
  "caramel" : 0,
  "peanutyalmondy" : 1,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 0,
  "bar" : 0,
  "pluribus" : 1,
  "sugarpercent" : 0.31299999,
  "pricepercent" : 0.51099998,
  "winpercent" : 23.417824,
  "unknown_field_index_13" : "3.14159"
}, {
  "competitorname" : "Nestle Butterfinger",
  "chocolate" : 1,
  "fruity" : 0,
  "caramel" : 0,
  "peanutyalmondy" : 1,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 0,
  "bar" : 1,
  "pluribus" : 0,
  "sugarpercent" : 0.60399997,
  "pricepercent" : 0.76700002,
  "winpercent" : "hello"
}, {
  "competitorname" : "Strawberry bon bons",
  "chocolate" : null,
  "fruity" : 1,
  "caramel" : 0,
  "peanutyalmondy" : 0,
  "nougat" : 0,
  "crispedricewafer" : 0,
  "hard" : 1,
  "bar" : 0,
  "pluribus" : 1,
  "sugarpercent" : 0.56900001,
  "pricepercent" : 0.057999998,
  "winpercent" : 34.578991
} ]
To see why these records were invalid, select the Provenance icon:
Then select the View Details button ("i" icon) for the ROUTE event:
On the Provenance Event window that opens, scroll down to the bottom so see the Details:
You can look at the contents of the "valid" connection to see the records that did match the schema. A total of 82 records were valid, which is displayed by the record.count attribute:
In preparation for the next article of the tutorial, start the two LogAttribute processors to clear the connection queues. Then stop all processors.
Created on 02-24-2021 08:23 AM
How can i convert some of the fields based on this validation?. Can i perform validation based on a particular data type alone?. Thanks
