Created on 05-07-2018 03:08 AM - edited 08-17-2019 07:35 AM
Short Description:
This Tutorial describes how to add fields,removing not required fields and change values of fields in flowfile.
Article
Introduction
Using UpdateRecord processor we are can update the contents of flowfile.
Overview of article:
Below sections describes the changes that are going to happen to the input flowfile content vs output flowfile contents.
Input:
input Json record as follows and having 6 fields/elements in it.
[ { "id" : 1, "name" : "foo", "age" : 20, "state" : "FLORIDA", "ts_milli" : 1525526792098, "ts" : "2018-05-03 10:10:10.123" }]
Expected Output:
I'm writing output in Avro Format but for viewing purpose i have converted the output flowfile to json.
[ { "id" : "1", "rename_id" : "1", //newly added field based on id "state" : "florida", //changed the value of field "rename_state" : "FLORIDA", //newly added field based on state "unique_id" : "1-416425265990923-FLORIDA-aab46988-6a27-4008-9a4d-65655abe9c3c",//id-filename-state-UUID "ts_milli" : 1525526792098, "date" : "2018-05-05", //newly added field formatted as date based on ts_milli "ts" : "2018-05-03 10:10:10.123", "ts_tz" : "2018-05-03T10:10:10Z",//newly added field formatted as date based on ts "current_ts" : "2018-05-05 14:23:15", //current timestamp value "updated_by" : "NiFi", //newly added field based on user attribute value "gmt_time" : "2018-05-05 18:23:18.611" //newly added field gmt time. }]
We are missing name,age fields because those fields are not required in output contents.
UpdateRecord Processor Params:
Record Reader Specifies the Controller Service to use for reading incoming data
Record Writer Specifies the Controller Service to use for writing out the records.
Replacement Value Strategy Specifies how to interpret the configured replacement values
1.Literal Value
The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated.
2.Record Path Value
The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. if this option is selected, and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship.
**Note**
This Processor requires that at least one user-defined Property be added.
Please refer to this link for more details regarding UpdateRecord Processor.
Flow:
GenerateFlowFile Configs:
Configure the processor as shown below
Add new properties as
schema.name
sch
user
NiFi
UpdateRecord Strategy as Record Path Configs:-
As Replacement Value Strategy
Record Path Value
Record Reader: JsonTreeReader
Configure the controller service as shown below
AvroSchemaRegistry Configs:
Add new property
{ "namespace": "nifi", "name": "person", "type": "record", "fields": [ { "name": "id", "type": ["null","int"] }, { "name": "name", "type": ["null","string"] }, { "name": "age", "type": ["null","int"] }, { "name": "state", "type": ["null","string"] }, { "name": "ts_milli", "type" : ["null","long"] }, { "name": "ts", "type": ["null","string"] } ] }
By using above schema we are reading the incoming json message (or) you can even specify only the required fields in this schema then processor will read only those fields and all the names are case sensitive.
Record Writer:
I'm using AvroRecordSetWriter i.e we are reading input in json format and writing as avro format.
Configure the Controller service as shown below.
Schema Text:
{ "namespace": "nifi", "name": "person", "type": "record", "fields": [ { "name": "id", "type": ["null","string"] }, { "name": "rename_id", "type": ["null","string"] }, { "name": "state", "type": ["null","string"] }, { "name": "rename_state", "type": ["null","string"] }, { "name": "unique_id", "type": ["null","string"] }, { "name": "ts_milli", "type" : { "type" : "long", "logicalType" : "timestamp-millis" } }, { "name": "date", "type": ["null","string"] }, { "name": "ts", "type": ["null","string"] }, { "name": "ts_tz", "type": ["null","string"] }, { "name": "current_ts", "type": ["null","string"] }, { "name": "gmt_time", "type": ["null","string"] }, { "name": "updated_by", "type": ["null","string"] } ] }
As you can see there are bunch of new fields are added in the above avro schema.
Now we are going to add user-defined properties in update record processor
1./current_ts
replaceRegex( /id, '(^.*$)', '${now():format("yyyy-MM-dd HH:mm:ss")}')
To get current_ts field value we are using replaceRegex function of RecordPathDomainSpecific language and replacing the id value with current timestamp with the specific format
2./date
format(/ts_milli,"yyyy-MM-dd")
Based on ts_milli record path value and changing the format to get only the date from epoch time in milliseconds
3./rename_id
/id
Assigning the id value to new rename_id field we are going to have same id value for both fields
4./rename_state
/state
Assigning the state value to new rename_state field we are going to have same state value for both fields because in our output avro schema includes both fields.
5./ts_tz
format(toDate( /ts, "yyyy-MM-dd HH:mm:ss.SSS"),"yyyy-MM-dd'T'HH:mm:ss'Z'")
Changing the format of the ts field value and adding T and Z and assigning the new value to ts_tz field.
6./unique_id
concat(/id,'-','${filename}','-',/state,'-','${UUID()}')
In this field value we are concatenating id,filename,state,UUID() values with - to make the value unique and assigning the value to unique_id field.
Our output flowfile will be in avro format to view the flowfile content we are using ConvertAvroToJSON processor to view each processor output.
Output:
[ { "id" : "1", "rename_id" : "1", "state" : "FLORIDA", "rename_state" : "FLORIDA", "unique_id" : "1-423955681497409-FLORIDA-3d71d47d-da7a-44bb-b506-9f7a027933e2", "ts_milli" : 1525526792098, "date" : "2018-05-05", "ts" : "2018-05-03 10:10:10.123", "ts_tz" : "2018-05-03T10:10:10Z", "current_ts" : "2018-05-05 16:28:39", "gmt_time" : null, //this field is going to populated in next UpdateRecord processor "updated_by" : null //this field is going to populated in next UpdateRecord processor }]
Update Record Strategy as Literal value configs:-
Record Reader as Avro reader as we are having feeding avro file with embedded schema so we are keeping Schema Access Strategy as Embedded Avro Schema
AvroSet writer configs:
Schema Text:
{ "namespace": "nifi", "name": "person", "type": "record", "fields": [ { "name": "id", "type": ["null","string"] }, { "name": "rename_id", "type": ["null","string"] }, { "name": "state", "type": ["null","string"] }, { "name": "rename_state", "type": ["null","string"] }, { "name": "unique_id", "type": ["null","string"] }, { "name": "ts_milli", "type" : { "type" : "long", "logicalType" : "timestamp-millis" } }, { "name": "date", "type": ["null","string"] }, { "name": "ts", "type": ["null","string"] }, { "name": "ts_tz", "type": ["null","string"] }, { "name": "current_ts", "type": ["null","string"] }, { "name": "gmt_time", "type": ["null","string"] }, { "name": "updated_by", "type": ["null","string"] } ] }
Add new user-defined properties as
1./gmt_time
${now():format("yyyy-MM-dd HH:mm:ss.SSS","GMT")}
We are adding gmt time by using NiFi expression language
2./state
${field.value:isEmpty():not():ifElse('${field.value:toLower()}','${literal("empty")}')}
in this value we are checking state field value and if it's not empty then changing the value to lower case,if the value is null or empty string or blank then keeping the value as literal "empty"
3./updated_by
${user}
We have added user attribute in GenerateFlowfile processor and now we assigning the user attribute value to update_by field value.
Final Output:
[ { "id" : "1", "rename_id" : "1", "state" : "florida", "rename_state" : "FLORIDA", "unique_id" : "1-425389131155321-FLORIDA-c81faa7a-4f2e-414e-bff5-d8722485921c", "ts_milli" : 1525526792098, "date" : "2018-05-05", "ts" : "2018-05-03 10:10:10.123", "ts_tz" : "2018-05-03T10:10:10Z", "current_ts" : "2018-05-05 16:52:32", "gmt_time" : "2018-05-05 20:52:36.088", "updated_by" : "NiFi" }]
As i have used output format as Avro but we can use same avro schema for other formats(json,csv..).
Now we have prepared each record by adding new fields to it and removing unnecessary fields from the record, then Use success relation for further processing.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.
Please refer this link for NiFi Expression Language.
Reference flow.xml
update-contents-using-updaterecord.xml
Created on 01-05-2024 05:46 AM
Thanks