Created on 08-11-2017 02:13 PM - edited 08-17-2019 11:36 AM
This tutorial walks you through a NiFI flow that utilizes the QueryRecord processor and Record Reader/Writer controller services to convert a CVS file into JSON format and then query the data using SQL.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.
This tutorial was tested using the following environment and components:
Here is a template of the flow discussed in this tutorial: queryrecord-csvtojson.xml
Here is the CSV file used in the flow: users.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL.
Create a local input directory. Place the "users.csv" file in the input directory.
Start NiFi. Import the provided template and add it to the canvas.
You should see the following flow on your NiFi canvas:
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Change the Input Directory path in the GetFile processor to point to your local input directory:
The flow is now ready to run.
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of user data from a local directory
2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile
3. QueryRecord converts the flowfile contents from CSV to JSON by:
4. QueryRecord queries the flowfile using SQL:
5. The female and male user data are sent to UpdateAttribute processors to provide a simple place to hold the data.
Let's look at each of the processors in the flow in detail:
Get CSV File (GetFile Processor)
FlowFiles are generated from the users.csv file in the local directory. Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Note that there are 10 total users.
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
QueryRecord Processor
The next processor, QueryRecord, allows users to write SQL SELECT statements to run over their data as it streams through the system. Each FlowFile is treated as if it were a database table named FLOWFILE. Multiple SQL queries can be added to the processor. Looking at the configuration:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. More details about these controller services can be found below.
In order to distinguish the results of each query and route the data appropriately, the name of the property is the name of the Relationship that data matching the query should be routed to. The first added property is named "female" and will include user data where the gender of the user is "F":
SELECT * FROM FLOWFILE WHERE gender = 'F'
(NOTE: When entering a value for a property in NiFi, you can use Shift + Enter to insert a newline in your value.)
The second property is named "male" and will include user data where the gender of the user is "M":
SELECT * FROM FLOWFILE WHERE gender = 'M'
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the QueryRecord processor. The flowfile is now split into two flowfiles:
Looking at the contents of the "female" connection confirms the expected results of 6 female users in JSON format:
Looking at the contents of the "male" connection confirms the expected results of 4 male users in JSON format:
The SQL queries in the flow are very basic for the purposes of this tutorial. The beauty of the QueryRecord processor is that it supports the SQL for more advanced queries and operations, such as filtering specific columns/rows/fields from your data, renaming those columns/rows/fields, performing calculations and aggregations on the data. Enjoy exploring the capabilities of this new functionality!
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Created on 02-23-2018 10:15 AM
Hi @Andrew Lim
Thanks for the article. I tried to execute this using NiFi 1.5 but I get an error "already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed" in QueryRecord processor. PFb excerpt from logs -
ErrorHandlingInputStream[FlowFile=StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179]] for StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] because the session was rolled back without the input stream being closed. 2018-02-23 15:10:17,160 ERROR [Timer-Driven Process Thread-3] o.a.nifi.processors.standard.QueryRecord QueryRecord[id=aff8e1fe-ff3c-3d77-9fcf-dcd3c1cf6ed9] QueryRecord[id=aff8e1fe-ff3c-3d77-9fcf-dcd3c1cf6ed9] failed to process session due to java.lang.IllegalStateException: StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed: {} java.lang.IllegalStateException: StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed<br>
Created on 02-23-2018 05:33 PM
Hi @spdvnz
It looks like you ran into a bug that should have been addressed in NiFi 1.5.0 (https://issues.apache.org/jira/browse/NIFI-4717):
If QueryRecord fails to parse data properly with the configured reader, it may roll back the session instead of routing to failure, leading the FlowFile being stuck on the queue. This includes an error message indicating that the FlowFile has an active callback or input stream that hasn't been closed.
If you are able to, can you provide a template of your flow and the data file (input.csv)? Perhaps you found a scenario that was missed by NIFI-4717. Providing your template and data, will also help to diagnose if your schema is correct.
As a side note, since this article was written using NiFi 1.3.0, I tried it out in NiFi 1.5.0 just to make sure and the flow worked successfully.
Thanks!
Created on 03-09-2018 02:13 AM
check further back in the logs for a sql error
Created on 02-04-2022 05:14 PM
Hello,
Right now user.txt file is with header. As easily records can be picked.
If this file is without header and u need to pick data from location/ position of digits.
How this query recorder will work.
Created on 02-06-2022 08:51 PM
@Meghna_3 as this is an older article, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.