In the spirit of Halloween last week, let's ingest some data that ranks the most popular candies: candy-data-dirty.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, Kaggle.
Values in the data were then added/modified/deleted for the purposes of this tutorial.
Create a local input directory. Place the "candy-data-dirty.csv" file in the input directory.
Start NiFi. Import the provided template and add it to the canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Update Directory Path in GetFile Processor
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run.
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of candy data from a local directory
2. UpdateAttribute adds Schema Name "candy" as an attribute to the flowfile
3. ValidateRecord converts the flowfile contents from CSV to JSON by:
Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
The AvroSchemaRegistry contains a "candy" schema which defines information about each record (field names, field ids, field types)
Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema
4. ValidateRecord then writes the data to:
The "valid" relationship for records that adhere to the schema
The "invalid" relationship for records that do not adhere to the schema
5. Both valid and invalid data are sent to LogAttribute processors in order to investigate the contents and provenance of the queued flowfiles.
Let's look at each of the processors in the flow in detail:
Get Data (GetFile Processor)
FlowFiles are generated from the candy-data-dirty.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "candy" to the flowfile:
Start the processor and view the attributes of the flowfile to confirm this:
The next processor, ValidateRecord, creates record objects from the flowfile. Then it reads the records one at a time and validates each against the given schema. The records are then written to either a 'valid' relationship or a 'invalid' relationship.
Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. Schema Registry is set to "AvroSchemaRegistry". More details about these controller services can be found below.
Allow Extra Fields property is set to "false" to demonstrate how records that have additional fields can be sent to the "invalid" relationship. Strict Type Checking is set to "true", to demonstrate how data with incorrect field types can be routed to the "invalid" relationship (described in more detail in the second article).
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Configuration button (gear icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the "candy" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Configuration button (gear icon) to see its properties: