Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Use NiFi to change the format of numeric, date and timestamp fields within a record-based file

avatar
Contributor

Short description

Learn how to use NiFi to change the format of numeric, date and timestamp fields within a record-based file.

Guide

NiFi 1.2.0+ offers a powerful RecordReader and RecordWriter API for manipulating record-based files.

To illustrate these capabilities we can create a simple test NiFi flow, consisting of a GenerateFlowFile and a ConvertRecordProcessor. The template XML for this flow has been attached below, and here is an image:

19434-capture.png

The GenerateFlowFile processor creates a test CSV flowfile at 15second intervals, with the following contents: 1.00|20170110 12:34:56|Test|20170111

Logically, this data consists of the following fields which we will be reformatting/ converting:

FLOAT_TO_INT, TIMESTAMP, TEXT, DATE

The ConvertRecordProcessor specifies a CSVReader and a JSONRecordSetWriter controller service for Reading and Writing records respectively.

JSON output was chosen as it is easy for us to read. We can see what the number, date and timestamp field values are formatted as when we change the Reader/Writer schemas.

19435-capture2.png

The UpdateAttributeProcessor does nothing and will be left disabled.

It is included so that we can analyse the output files in the "success" queue from the ConvertRecordProcessor.

The schema must be an Avro-compatible schema even though our data is CSV.

CSVReader configuration:

19436-csvreader.png

Note the values configured for the Date Format and Timestamp Format settings, which match the format of our input data fields.

Schema Text field:

{
  "type": "record",
  "namespace": "com.example",
  "name": "TestRecord",
  "fields": [
    { "name": "FLOAT_TO_INT"     , "type": "float" },
    { "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} },
    { "name": "TEXT"      , "type": "string" },
    { "name": "DATE"      , "type": { "type":"int", "logicalType":"date"} }
  ]
} 

Number conversion

For number conversion, do not specify String as the field type on the Reader schema.

Instead, specify a type which matches the way the data is formatted in your input file.

For our FLOAT_TO_INT field with value "1.00", the below works

{ "name": "FLOAT_TO_INT" , "type": "float" }

These do not work on the Reader and instead throw a NumberFormatException:

{ "name": "FLOAT_TO_INT" , "type": "string" }
{ "name": "FLOAT_TO_INT" , "type": "int" }

Date/Timestamp conversion

NiFi detects a field as being a Date or Timestamp by looking at the schema type and logicalType annotations, according to the Avro schema standard.

This means that the "type" and "logicalType" must be valid Avro, even if the data is another type.

For example, consider the TIMESTAMP field in our use case.

The data is a string with the value "20170110 12:34:56".

However an Avro type of "long" and annotated logicalType is "timestamp-millis" is used, to tell NiFi this is a Timestamp field.

This works:

{ "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} } 

These do not work:

{ "name": "TIMESTAMP" , "type": { "type":"string", "logicalType":"timestamp-millis"} }
{ "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp"} }

Other notes

When using an Avro logicalType to annotate a type, the type attribute becomes a nested field.

This is valid:

"type": { "type":"long", "logicalType":"timestamp-millis"} 

This is invalid, but does not throw a schema validation error. It will just not work as intended:

"type":"long", "logicalType":"timestamp-millis"

Writing values

We use a different schema for the output JSONRecordSetWriter, because we need to convert the FLOAT_TO_INT field.

If we were only doing date or timestamp conversions then the CSVReader input schema could still be used. JSONRecordSetWriter configuration:

19437-jsonrecordsetwriter.png

Schema Text field:

{
  "type": "record",
  "namespace": "com.example",
  "name": "TestRecord",
  "fields": [
    { "name": "FLOAT_TO_INT"     , "type": "int" },
    { "name": "TIMESTAMP" , "type": { "type":"long", "logicalType":"timestamp-millis"} },
    { "name": "TEXT"      , "type": "string" },
    { "name": "DATE"      , "type": { "type":"int", "logicalType":"date"} }
  ]
} 

Date Format: MM/dd/yyyy

Timestamp Format: MM/dd/yyyy

Number conversion

Note that the "type" for FLOAT_TO_INT field has been changed to "int"

Date/timestamp conversion

If Timestamp format is specified on the output: The output is written out as a string in that format (regardless of the "type" attribute value).

Example output:

[ {
  "FLOAT" : 1,
  "TIMESTAMP" : "01/10/2017",
  "TEXT" : "Test",
  "DATE" : "01/11/2017"
} ]

If no format is specified, then the output is written out as the number of milliseconds since midnight.

Example output:

[ {
  "FLOAT" : 1,
  "TIMESTAMP" : 1484051696000,
  "TEXT" : "Test",
  "DATE" : 1484092800000
} ]

Other notes

Note that the Date default conversion is currently not Avro standard compliant, as it is converted into a Long instead of into an Int.

The Avro standard requires "int" annotated by the logicalType "date" to be the number of days since the Unix epoch, and not number of milliseconds since the Unix Epoch.

References

CSVReader Type Coercion

Avro Logical Types

1 ACCEPTED SOLUTION

avatar
Contributor

@mayki wogno, I was using NiFi 1.2.0 with HDF 3.0.0.0-453

You can check the component versions of the processors in the screenshot and compare to yours.


I found that there were many combinations of "type" and "logical type" which did not work as expected for timestamp values. Perhaps check the exact type of data you have and then try some of the different options above, one might work for you.

In my case the input timestamp was in this format: "YYYYMMDD HH:MM:SS" and I wanted a valid Avro timestamp (long milliseconds) as the output.

View solution in original post

8 REPLIES 8

avatar
Contributor

NIFI-4182 logged regarding the apparent Date conversion issue mentioned at the end of this guide.

avatar
Rising Star

@bjorn : what is the version of nifi, have you used for demo ?

I'm using nifi 1.2.0 and the timestamp with "long" not works

avatar
Contributor

@mayki wogno, I was using NiFi 1.2.0 with HDF 3.0.0.0-453

You can check the component versions of the processors in the screenshot and compare to yours.


I found that there were many combinations of "type" and "logical type" which did not work as expected for timestamp values. Perhaps check the exact type of data you have and then try some of the different options above, one might work for you.

In my case the input timestamp was in this format: "YYYYMMDD HH:MM:SS" and I wanted a valid Avro timestamp (long milliseconds) as the output.

avatar
Rising Star

@Bjorn Olsen: thanks for help, in my controller, i've forgotten "date format".

It works as except.

thanks

avatar
New Contributor

@Bjorn Olsen I tried CSVRecordSetWriter but decimal to int conversion is not working. I am using NiFi 1.5

avatar

@Bjorn Olsen you can convert your question to an article to have more visbility

avatar
Contributor
@Abdelkrim Hadjidj
I can't seem to find a way to do that

avatar

By convert I meant "create an article". Sorry for the confusion