Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar

This is part 1 of a miniseries. Part 2 can be found here.

How do you count the number of lines in a FlowFile in NiFi? The naïve answer would be to run wc -l inside an ExecuteStreamCommand processor. This is unsatisfying because it requires firing up a shell command in a separate process. This is a scalability issue and also a security risk. But, there is a better way!

The Record processors of NiFi 1.2+ offer the possibility to run real time, in-stream SQL against FlowFiles. Why not use this capability? Here's the approach:

I am going to define a minimalistic schema that allows me to read each line of a CSV file as a record with one single field. Here is my example flow:

Let's go through the steps. First, I use a GetFile processor to pick up files in my home directory. The actual configuration does not matter - use whatever your data source is. These data are passed to a QueryRecord processor, which is where the magic happens.

My Query Record processor is configured like this:

For reading the data, I use a CSV Reader with an inline schema definition. The definition

It is important to use "Schema Text Property" as the access strategy. This allows me to enter the schema definition inline. My schema definition is very basic and contains only one field. Enter this as the value of the Schema Text attribute:

{
  "type": "record",
  "namespace": "txtx",
  "name": "text",
  "fields": [
    { "name": "line", "type": "string" }
  ]
}

I am leaving all other settings at their default values.

Going back to the QueryRecord processor, I use the "+" button to add a dynamic property called "Line Count". This automatically creates an outgoing relationship with the same name. As the value of this property, I enter my SQL query:

SELECT COUNT(*) as numLines
FROM FLOWFILE

As output, I want to have the line count as text. I use a FreeFormRecordSetWriter. This service has only one configuration property called "Text". But I can use NiFi expression language to refer to the fields of my result records. Since my query defined a column alias "numLines" for the line count, I can refer to this alias in the record writer. Configure the service like this:

and connect the "Line Count" relationship to a downstream processor. (In my example, I am just using a funnel.) View how the flow files with the line counts arrive in the queue.

7,033 Views
Version history
Last update:
‎10-06-2017 08:18 AM
Updated by:
Former Member
Contributors