Created on 07-13-2017 09:31 AM - edited 08-18-2019 01:22 AM
Learn how to use NiFi to change the format of numeric, date and timestamp fields within a record-based file.
NiFi 1.2.0+ offers a powerful RecordReader and RecordWriter API for manipulating record-based files.
To illustrate these capabilities we can create a simple test NiFi flow, consisting of a GenerateFlowFile and a ConvertRecordProcessor. The template XML for this flow has been attached below, and here is an image:
The GenerateFlowFile processor creates a test CSV flowfile at 15second intervals, with the following contents: 1.00|20170110 12:34:56|Test|20170111
Logically, this data consists of the following fields which we will be reformatting/ converting:
FLOAT_TO_INT, TIMESTAMP, TEXT, DATE
The ConvertRecordProcessor specifies a CSVReader and a JSONRecordSetWriter controller service for Reading and Writing records respectively.
JSON output was chosen as it is easy for us to read. We can see what the number, date and timestamp field values are formatted as when we change the Reader/Writer schemas.
The UpdateAttributeProcessor does nothing and will be left disabled.
It is included so that we can analyse the output files in the "success" queue from the ConvertRecordProcessor.
The schema must be an Avro-compatible schema even though our data is CSV.
CSVReader configuration:
Note the values configured for the Date Format and Timestamp Format settings, which match the format of our input data fields.
Schema Text field:
{ "type": "record", "namespace": "com.example", "name": "TestRecord", "fields": [ { "name": "FLOAT_TO_INT" , "type": "float" }, { "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} }, { "name": "TEXT" , "type": "string" }, { "name": "DATE" , "type": { "type":"int", "logicalType":"date"} } ] }
For number conversion, do not specify String as the field type on the Reader schema.
Instead, specify a type which matches the way the data is formatted in your input file.
For our FLOAT_TO_INT field with value "1.00", the below works
{ "name": "FLOAT_TO_INT" , "type": "float" }
These do not work on the Reader and instead throw a NumberFormatException:
{ "name": "FLOAT_TO_INT" , "type": "string" } { "name": "FLOAT_TO_INT" , "type": "int" }
Date/Timestamp conversion
NiFi detects a field as being a Date or Timestamp by looking at the schema type and logicalType annotations, according to the Avro schema standard.
This means that the "type" and "logicalType" must be valid Avro, even if the data is another type.
For example, consider the TIMESTAMP field in our use case.
The data is a string with the value "20170110 12:34:56".
However an Avro type of "long" and annotated logicalType is "timestamp-millis" is used, to tell NiFi this is a Timestamp field.
This works:
{ "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} }
These do not work:
{ "name": "TIMESTAMP" , "type": { "type":"string", "logicalType":"timestamp-millis"} } { "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp"} }
Other notes
When using an Avro logicalType to annotate a type, the type attribute becomes a nested field.
This is valid:
"type": { "type":"long", "logicalType":"timestamp-millis"}
This is invalid, but does not throw a schema validation error. It will just not work as intended:
"type":"long", "logicalType":"timestamp-millis"
We use a different schema for the output JSONRecordSetWriter, because we need to convert the FLOAT_TO_INT field.
If we were only doing date or timestamp conversions then the CSVReader input schema could still be used. JSONRecordSetWriter configuration:
Schema Text field:
{ "type": "record", "namespace": "com.example", "name": "TestRecord", "fields": [ { "name": "FLOAT_TO_INT" , "type": "int" }, { "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} }, { "name": "TEXT" , "type": "string" }, { "name": "DATE" , "type": { "type":"int", "logicalType":"date"} } ] }
Date Format: MM/dd/yyyy
Timestamp Format: MM/dd/yyyy
Number conversion
Note that the "type" for FLOAT_TO_INT field has been changed to "int"
Date/timestamp conversion
If Timestamp format is specified on the output: The output is written out as a string in that format (regardless of the "type" attribute value).
Example output:
[ { "FLOAT" : 1, "TIMESTAMP" : "01/10/2017", "TEXT" : "Test", "DATE" : "01/11/2017" } ]
If no format is specified, then the output is written out as the number of milliseconds since midnight.
Example output:
[ { "FLOAT" : 1, "TIMESTAMP" : 1484051696000, "TEXT" : "Test", "DATE" : 1484092800000 } ]
Other notes
Note that the Date default conversion is currently not Avro standard compliant, as it is converted into a Long instead of into an Int.
The Avro standard requires "int" annotated by the logicalType "date" to be the number of days since the Unix epoch, and not number of milliseconds since the Unix Epoch.
Created 12-04-2017 10:57 AM
@mayki wogno, I was using NiFi 1.2.0 with HDF 3.0.0.0-453
You can check the component versions of the processors in the screenshot and compare to yours.
I found that there were many combinations of "type" and "logical type" which did not work as expected for timestamp values. Perhaps check the exact type of data you have and then try some of the different options above, one might work for you.
In my case the input timestamp was in this format: "YYYYMMDD HH:MM:SS" and I wanted a valid Avro timestamp (long milliseconds) as the output.
Created 07-13-2017 10:49 AM
NIFI-4182 logged regarding the apparent Date conversion issue mentioned at the end of this guide.
Created 12-04-2017 10:42 AM
@bjorn : what is the version of nifi, have you used for demo ?
I'm using nifi 1.2.0 and the timestamp with "long" not works
Created 12-04-2017 10:57 AM
@mayki wogno, I was using NiFi 1.2.0 with HDF 3.0.0.0-453
You can check the component versions of the processors in the screenshot and compare to yours.
I found that there were many combinations of "type" and "logical type" which did not work as expected for timestamp values. Perhaps check the exact type of data you have and then try some of the different options above, one might work for you.
In my case the input timestamp was in this format: "YYYYMMDD HH:MM:SS" and I wanted a valid Avro timestamp (long milliseconds) as the output.
Created 12-04-2017 01:27 PM
@Bjorn Olsen: thanks for help, in my controller, i've forgotten "date format".
It works as except.
thanks
Created 10-24-2018 07:13 PM
@Bjorn Olsen I tried CSVRecordSetWriter but decimal to int conversion is not working. I am using NiFi 1.5
Created 12-05-2017 07:54 AM
@Bjorn Olsen you can convert your question to an article to have more visbility
Created 12-05-2017 08:05 AM
Created 12-05-2017 08:08 AM
By convert I meant "create an article". Sorry for the confusion