Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

Short Description:

This Tutorial describes how to add fields,removing not required fields and change values of fields in flowfile.

Article

Introduction

Using UpdateRecord processor we are can update the contents of flowfile.

Overview of article:
Below sections describes the changes that are going to happen to the input flowfile content vs output flowfile contents.

Input:
input Json record as follows and having 6 fields/elements in it.

[ {
  "id" : 1,
  "name" : "foo",
  "age" : 20,
  "state" : "FLORIDA",
  "ts_milli" : 1525526792098,
  "ts" : "2018-05-03 10:10:10.123"
}]

Expected Output:
I'm writing output in Avro Format but for viewing purpose i have converted the output flowfile to json.

[ {
  "id" : "1",
  "rename_id" : "1", //newly added field based on id 
  "state" : "florida", //changed the value of field
  "rename_state" : "FLORIDA", //newly added field based on state
  "unique_id" : "1-416425265990923-FLORIDA-aab46988-6a27-4008-9a4d-65655abe9c3c",//id-filename-state-UUID
  "ts_milli" : 1525526792098,
  "date" : "2018-05-05", //newly added field formatted as date based on ts_milli
  "ts" : "2018-05-03 10:10:10.123",
  "ts_tz" : "2018-05-03T10:10:10Z",//newly added field formatted as date based on ts
  "current_ts" : "2018-05-05 14:23:15", //current timestamp value
  "updated_by" : "NiFi", //newly added field based on user attribute value
  "gmt_time" : "2018-05-05 18:23:18.611" //newly added field gmt time.
}]

We are missing name,age fields because those fields are not required in output contents.

UpdateRecord Processor Params:

Record Reader Specifies the Controller Service to use for reading incoming data

Record Writer Specifies the Controller Service to use for writing out the records.

Replacement Value Strategy Specifies how to interpret the configured replacement values

1.Literal Value

The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated.

2.Record Path Value

The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. if this option is selected, and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship.

**Note**

This Processor requires that at least one user-defined Property be added.

Please refer to this link for more details regarding UpdateRecord Processor.
Flow:

72619-flow.png

GenerateFlowFile Configs:

Configure the processor as shown below

72618-generateflowfile.png

Add new properties as

schema.name

sch

user

NiFi

UpdateRecord Strategy as Record Path Configs:-
As Replacement Value Strategy

Record Path Value

Record Reader: JsonTreeReader
Configure the controller service as shown below

72620-jsontreereader.png

AvroSchemaRegistry Configs:

72621-avroschemaregistry.png

Add new property

sch
{
 "namespace": "nifi",
 "name": "person",
 "type": "record",
 "fields": [
  { "name": "id",  "type": ["null","int"] },
  { "name": "name",  "type": ["null","string"] },
  { "name": "age",  "type": ["null","int"] },
  { "name": "state", "type": ["null","string"] },
  { "name": "ts_milli", "type" : ["null","long"] },
  { "name": "ts", "type": ["null","string"] }
 ]
}

By using above schema we are reading the incoming json message (or) you can even specify only the required fields in this schema then processor will read only those fields and all the names are case sensitive.

Record Writer:

I'm using AvroRecordSetWriter i.e we are reading input in json format and writing as avro format.

Configure the Controller service as shown below.

72625-avrosetwriter-record-path.png

Schema Text:

{
 "namespace": "nifi",
 "name": "person",
 "type": "record",
 "fields": [
  { "name": "id",  "type": ["null","string"] },
  { "name": "rename_id",  "type": ["null","string"] },
  { "name": "state", "type": ["null","string"] },
  { "name": "rename_state", "type": ["null","string"] },
  { "name": "unique_id", "type": ["null","string"] },
  { "name": "ts_milli", "type" : {
      "type" : "long",
         "logicalType" : "timestamp-millis"
          }
  },
  { "name": "date", "type": ["null","string"] },
  { "name": "ts", "type": ["null","string"] },
  { "name": "ts_tz", "type": ["null","string"] },
  { "name": "current_ts", "type": ["null","string"] },
  { "name": "gmt_time", "type": ["null","string"] },
  { "name": "updated_by", "type": ["null","string"] }
   ]
}

As you can see there are bunch of new fields are added in the above avro schema.

Now we are going to add user-defined properties in update record processor

72623-updaterecord-user-defined.png


1./current_ts

replaceRegex( /id, '(^.*$)', '${now():format("yyyy-MM-dd HH:mm:ss")}')

To get current_ts field value we are using replaceRegex function of RecordPathDomainSpecific language and replacing the id value with current timestamp with the specific format


2./date

format(/ts_milli,"yyyy-MM-dd")

Based on ts_milli record path value and changing the format to get only the date from epoch time in milliseconds

3./rename_id

/id

Assigning the id value to new rename_id field we are going to have same id value for both fields

4./rename_state

/state

Assigning the state value to new rename_state field we are going to have same state value for both fields because in our output avro schema includes both fields.

5./ts_tz

format(toDate( /ts, "yyyy-MM-dd HH:mm:ss.SSS"),"yyyy-MM-dd'T'HH:mm:ss'Z'")

Changing the format of the ts field value and adding T and Z and assigning the new value to ts_tz field.

6./unique_id

concat(/id,'-','${filename}','-',/state,'-','${UUID()}')

In this field value we are concatenating id,filename,state,UUID() values with - to make the value unique and assigning the value to unique_id field.

Our output flowfile will be in avro format to view the flowfile content we are using ConvertAvroToJSON processor to view each processor output.

Output:

[ {
  "id" : "1",
  "rename_id" : "1",
  "state" : "FLORIDA",
  "rename_state" : "FLORIDA",
  "unique_id" : "1-423955681497409-FLORIDA-3d71d47d-da7a-44bb-b506-9f7a027933e2",
  "ts_milli" : 1525526792098,
  "date" : "2018-05-05",
  "ts" : "2018-05-03 10:10:10.123",
  "ts_tz" : "2018-05-03T10:10:10Z",
  "current_ts" : "2018-05-05 16:28:39",
  "gmt_time" : null, //this field is going to populated in next UpdateRecord processor
  "updated_by" : null //this field is going to populated in next UpdateRecord processor
}]

Update Record Strategy as Literal value configs:-

Record Reader as Avro reader as we are having feeding avro file with embedded schema so we are keeping Schema Access Strategy as Embedded Avro Schema

72624-avro-reader.png

AvroSet writer configs:

72622-avrosetwriter.png

Schema Text:

{
 "namespace": "nifi",
 "name": "person",
 "type": "record",
 "fields": [
  { "name": "id",  "type": ["null","string"] },
  { "name": "rename_id",  "type": ["null","string"] },
  { "name": "state", "type": ["null","string"] },
  { "name": "rename_state", "type": ["null","string"] },
  { "name": "unique_id", "type": ["null","string"] },
  { "name": "ts_milli", "type" : {
      "type" : "long",
         "logicalType" : "timestamp-millis"
          }
  },
  { "name": "date", "type": ["null","string"] },
  { "name": "ts", "type": ["null","string"] },
  { "name": "ts_tz", "type": ["null","string"] },
  { "name": "current_ts", "type": ["null","string"] },
  { "name": "gmt_time", "type": ["null","string"] },
  { "name": "updated_by", "type": ["null","string"] }
   ]
}

Add new user-defined properties as

72626-literal-value-ur.png

1./gmt_time

${now():format("yyyy-MM-dd HH:mm:ss.SSS","GMT")}

We are adding gmt time by using NiFi expression language

2./state

${field.value:isEmpty():not():ifElse('${field.value:toLower()}','${literal("empty")}')}

in this value we are checking state field value and if it's not empty then changing the value to lower case,if the value is null or empty string or blank then keeping the value as literal "empty"

3./updated_by

${user}

We have added user attribute in GenerateFlowfile processor and now we assigning the user attribute value to update_by field value.

Final Output:

[ {
  "id" : "1",
  "rename_id" : "1",
  "state" : "florida",
  "rename_state" : "FLORIDA",
  "unique_id" : "1-425389131155321-FLORIDA-c81faa7a-4f2e-414e-bff5-d8722485921c",
  "ts_milli" : 1525526792098,
  "date" : "2018-05-05",
  "ts" : "2018-05-03 10:10:10.123",
  "ts_tz" : "2018-05-03T10:10:10Z",
  "current_ts" : "2018-05-05 16:52:32",
  "gmt_time" : "2018-05-05 20:52:36.088",
  "updated_by" : "NiFi"
}]

As i have used output format as Avro but we can use same avro schema for other formats(json,csv..).
Now we have prepared each record by adding new fields to it and removing unnecessary fields from the record, then Use success relation for further processing.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.

Please refer this link for NiFi Expression Language.

Reference flow.xml
update-contents-using-updaterecord.xml

46,872 Views
Comments
avatar
Explorer

Thanks