Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial walks you through a NiFI flow that utilizes the QueryRecord processor and Record Reader/Writer controller services to convert a CVS file into JSON format and then query the data using SQL.

Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0

QueryRecord Flow

Support Files

Here is a template of the flow discussed in this tutorial: queryrecord-csvtojson.xml

Here is the CSV file used in the flow: users.txt (Change the extension from .txt to .csv after downloading)

Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL.

Demo Configuration

Input Directory

Create a local input directory. Place the "users.csv" file in the input directory.

27402-1-input-directory.png

Import Template

Start NiFi. Import the provided template and add it to the canvas.

You should see the following flow on your NiFi canvas:

27403-2-template-import.png

Enable Controller Services

Select the gear icon from the Operate Palette:

27404-3-flow-configuration.png

This opens the NiFi Flow Configuration window. Select the Controller Services tab:

27405-4-controller-services-disabled.png

Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:

27406-5-controller-services-enabled.png

Update Directory Path in GetFile Processor

Change the Input Directory path in the GetFile processor to point to your local input directory:

27407-6-getfile-directory.png

The flow is now ready to run.

Flow Overview

Here is a quick overview of the flow:

1. GetFile ingests a CSV file of user data from a local directory

2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile

3. QueryRecord converts the flowfile contents from CSV to JSON by:

  • Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
  • The AvroSchemaRegistry contains a "users" schema which defines information about each record (field names, field ids, field types)
  • Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema

4. QueryRecord queries the flowfile using SQL:

  • The "female" property uses SQL to select rows from the data where gender = 'F'
  • The "male" property uses SQL to selects rows from the data where gender = 'M'

5. The female and male user data are sent to UpdateAttribute processors to provide a simple place to hold the data.

Flow Details

Let's look at each of the processors in the flow in detail:

Get CSV File (GetFile Processor)

FlowFiles are generated from the users.csv file in the local directory. Start the processor:

27408-7-getfile-start.png

One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):

27409-8-listqueue-details.png

From the FlowFile window that opens, select the "View" button from the Details tab:

27410-9-flowfile-view.png

to view the CSV contents of the flowfile:

27411-10-flowfile-contents.png

Note that there are 10 total users.

Add Schema Name Attribute (UpdateAttribute Processor)

The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:

27412-11-updateattribute-users-schema.png

Start the processor, and view the attributes of the flowfile to confirm this:

27413-12-flowfile-schema-name-attribute.png

QueryRecord Processor

The next processor, QueryRecord, allows users to write SQL SELECT statements to run over their data as it streams through the system. Each FlowFile is treated as if it were a database table named FLOWFILE. Multiple SQL queries can be added to the processor. Looking at the configuration:

27414-13-queryrecord-properties.png

Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON. More details about these controller services can be found below.

In order to distinguish the results of each query and route the data appropriately, the name of the property is the name of the Relationship that data matching the query should be routed to. The first added property is named "female" and will include user data where the gender of the user is "F":

  SELECT *
  FROM FLOWFILE
  WHERE gender = 'F'

(NOTE: When entering a value for a property in NiFi, you can use Shift + Enter to insert a newline in your value.)

The second property is named "male" and will include user data where the gender of the user is "M":

  SELECT *
  FROM FLOWFILE
  WHERE gender = 'M'

CSVReader Controller Service

Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:

27415-14-csvreader-properties.png

With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:

27416-15-avroschemaregistry-properties.png

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:

27417-16-jsonrecordsetwriter-properties.png

Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

Start the QueryRecord processor. The flowfile is now split into two flowfiles:

27418-17-queryrecord-start.png

Looking at the contents of the "female" connection confirms the expected results of 6 female users in JSON format:

27419-18-flowfile-female-users-json.png

Looking at the contents of the "male" connection confirms the expected results of 4 male users in JSON format:

27420-19-flowfile-male-users-json.png

The SQL queries in the flow are very basic for the purposes of this tutorial. The beauty of the QueryRecord processor is that it supports the SQL for more advanced queries and operations, such as filtering specific columns/rows/fields from your data, renaming those columns/rows/fields, performing calculations and aggregations on the data. Enjoy exploring the capabilities of this new functionality!

Helpful Links

Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:

54,325 Views
Comments

Hi @Andrew Lim

Thanks for the article. I tried to execute this using NiFi 1.5 but I get an error "already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed" in QueryRecord processor. PFb excerpt from logs -

ErrorHandlingInputStream[FlowFile=StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179]] for StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] because the session was rolled back without the input stream being closed.
2018-02-23 15:10:17,160 ERROR [Timer-Driven Process Thread-3] o.a.nifi.processors.standard.QueryRecord QueryRecord[id=aff8e1fe-ff3c-3d77-9fcf-dcd3c1cf6ed9] QueryRecord[id=aff8e1fe-ff3c-3d77-9fcf-dcd3c1cf6ed9] failed to process session due to java.lang.IllegalStateException: StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed: {}
java.lang.IllegalStateException: StandardFlowFileRecord[uuid=9688750b-0f83-4c64-91c6-80499ef63c24,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1519378477705-1, container=default, section=1], offset=532, length=179],offset=0,name=input.csv,size=179] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed<br>

Can you please tell what's wrong?

Hi @spdvnz

It looks like you ran into a bug that should have been addressed in NiFi 1.5.0 (https://issues.apache.org/jira/browse/NIFI-4717):

If QueryRecord fails to parse data properly with the configured reader, it may roll back the session instead of routing to failure, leading the FlowFile being stuck on the queue. This includes an error message indicating that the FlowFile has an active callback or input stream that hasn't been closed.

If you are able to, can you provide a template of your flow and the data file (input.csv)? Perhaps you found a scenario that was missed by NIFI-4717. Providing your template and data, will also help to diagnose if your schema is correct.

As a side note, since this article was written using NiFi 1.3.0, I tried it out in NiFi 1.5.0 just to make sure and the flow worked successfully.

Thanks!

check further back in the logs for a sql error

avatar
New Contributor

Hello,

Right now user.txt file is with header. As easily records can be picked.

If this file is without header and u need to pick data from location/ position of digits.

 

How this query recorder will work.

avatar
Community Manager

@Meghna_3 as this is an older article, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.