Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi attribute containing large text value

Solved Go to solution

Nifi attribute containing large text value

Contributor

Hi, dear Experts!

Could you please help with following issue:

I have a processor ExtractText that processes JSON flow file and creates two attributes with large JSON text.

mainJSON = ^.*(?=\x1b)|^((?!\x1b).)*$)

enrichmentJSON = ((?<=\x1b).*)

As an output there were created 3 attributes for mainJSON and enrichmentJSON

mainJSON: mainJSON, mainJSON.0, mainJSON.1

--- each containing the same portion of expected result.

Isn't it possible to store large text value in an attribute? Is there another way to store and pass large text value as an attribute?

As a next step I wanted to combine these two attributes with other attributes in ReplaceText processor and put them into hive table as separate columns of one row.

Thanks in advance!


attrib.jpg
1 ACCEPTED SOLUTION

Accepted Solutions

Re: Nifi attribute containing large text value

Contributor

@Matt Burgess

So it is needed to replace ESC (\x1b) with delimiter: '|' using UpdateRecord processor.

Could you please help to configure this processor to impement this replacement in flow file records!

6 REPLIES 6

Re: Nifi attribute containing large text value

It is usually not recommended to store large values in attributes as they are kept in memory which can cause issues for the entire flow. Can you share an example JSON and what you're trying to get as a result? You might be able to use UpdateRecord to create the new fields in-place (i.e. in the flow file contents) rather than having to extract fields into attributes.

Re: Nifi attribute containing large text value

Contributor

Hi, @Matt Burgess!

My sample formatted JSON file content (in original flow file JSON objects are separated by new line)

{
  "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema",
  "schemaName": "CpmCustomerChangeEvent",
  "schemaVersion": "5.1.1",
  "eventHeader": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": {
      "eventCreationTime": {
        "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": {
          "timestamp": {
            "string": "2018-04-03T23:08:38.652+03:00"
          },
          "timeZoneType": {
            "string": "SYSTEM_TIME_ZONE"
          },
          "zoneName": {
            "string": "Europe/Kiev"
          }
        }
      },
      "cpmInstanceHost": {
        "string": "env6-cpm1.dbss.bis.ua"
      },
      "recordUniqueId": {
        "string": "77ECA557BFA74C98B2792222C9C72CED"
      }
    }
  },
  "customerInformation": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": {
      "customerId": {
        "string": "5A834B1C27DE4FAF9F038B370AA3DDA4"
      },
      "partyId": null
    }
  },
  "genericInterfaceParameters": null,
  "requestInfo": null,
  "partyChangeResult": null
}{
  "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema",
  "schemaName": "EventDataEnrichment",
  "schemaVersion": "1.0.0",
  "enrichedData": [
    
  ]
}
{
  "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema",
  "schemaName": "CpmCustomerChangeEvent",
  "schemaVersion": "5.1.1",
  "eventHeader": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": {
      "eventCreationTime": {
        "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": {
          "timestamp": {
            "string": "2018-04-03T23:08:39.652+03:10"
          },
          "timeZoneType": {
            "string": "SYSTEM_TIME_ZONE"
          },
          "zoneName": {
            "string": "Europe/Kiev"
          }
        }
      },
      "cpmInstanceHost": {
        "string": "env6-cpm1.dbss.bis.ua"
      },
      "recordUniqueId": {
        "string": "72DEA157BFA74C98B2792222C0C11CBE"
      }
    }
  },
  "customerInformation": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": {
      "customerId": {
        "string": "1E2234B1C27DE4FAF9F038B370AA3DBE4"
      },
      "partyId": null
    }
  },
  "genericInterfaceParameters": null,
  "requestInfo": null,
  "partyChangeResult": null
}
{
  "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema",
  "schemaName": "CpmCustomerChangeEvent",
  "schemaVersion": "5.1.1",
  "eventHeader": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": {
      "eventCreationTime": {
        "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": {
          "timestamp": {
            "string": "2018-04-03T23:08:40.652+02:20"
          },
          "timeZoneType": {
            "string": "SYSTEM_TIME_ZONE"
          },
          "zoneName": {
            "string": "Europe/Kiev"
          }
        }
      },
      "cpmInstanceHost": {
        "string": "env6-cpm1.dbss.bis.ua"
      },
      "recordUniqueId": {
        "string": "55CBA557BFA74C98B2792222C9A11CDE"
      }
    }
  },
  "customerInformation": {
    "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": {
      "customerId": {
        "string": "1E244B1C27DE4FAF9F038B370AA3DDD5"
      },
      "partyId": null
    }
  },
  "genericInterfaceParameters": null,
  "requestInfo": null,
  "partyChangeResult": null
}{
  "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema",
  "schemaName": "EventDataEnrichment",
  "schemaVersion": "1.0.0",
  "enrichedData": [
    
  ]
}
<br>

Important thing to notice here is that some of the JSON objects in the flow file contain extention seperated with ESC (\x1b).

{
  "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema",
  "schemaName": "EventDataEnrichment",
  "schemaVersion": "1.0.0",
  "enrichedData": [
    
  ]
}<br>

Schema looks like below:

{mainJSON}{extentionJSON}
{mainJSON}
{mainJSON}{extentionJSON} .....

In the output I would like to have the following format:

mainJSON | extentionJSON
mainJSON |

mainJSON | extentionJSON

etc...

Thank you!

Re: Nifi attribute containing large text value

Contributor

@Matt Burgess

So it is needed to replace ESC (\x1b) with delimiter: '|' using UpdateRecord processor.

Could you please help to configure this processor to impement this replacement in flow file records!

Re: Nifi attribute containing large text value

For that case, if you just need to replace the ESC with | then use ReplaceText with Line-by-Line strategy (with either Regex Replace or Literal Replace, one or the either or both should work) to replace \x1b with |

Re: Nifi attribute containing large text value

Note that since your format is not JSON nor JSON-per-line, you will have to do further processing before using any processors (record-based or not) that handle JSON. As of NiFi 1.7.0 (via NIFI-4456) the JsonTreeReader (and writer) allow for JSON-per-line, but your format is not exactly that either. If the existing processors or controller services (i.e. readers/writers) don't work, you might have to resort to a ScriptedRecordReader/Writer or a scripting processor to do custom handling.

Re: Nifi attribute containing large text value

New Contributor

If you use large attributes, you will have serious issue with the "snapshot" file in the flow content repository. I've just killed my PROD this way last week : the snapshot was too big too fit in memory at startup : my data was lost.

Don't have an account?
Coming from Hortonworks? Activate your account here