Created 07-11-2018 12:12 PM
Hi, dear Experts!
Could you please help with following issue:
I have a processor ExtractText that processes JSON flow file and creates two attributes with large JSON text.
mainJSON = ^.*(?=\x1b)|^((?!\x1b).)*$)
enrichmentJSON = ((?<=\x1b).*)
As an output there were created 3 attributes for mainJSON and enrichmentJSON
mainJSON: mainJSON, mainJSON.0, mainJSON.1
--- each containing the same portion of expected result.
Isn't it possible to store large text value in an attribute? Is there another way to store and pass large text value as an attribute?
As a next step I wanted to combine these two attributes with other attributes in ReplaceText processor and put them into hive table as separate columns of one row.
Thanks in advance!
Created 07-12-2018 05:49 AM
So it is needed to replace ESC (\x1b) with delimiter: '|' using UpdateRecord processor.
Could you please help to configure this processor to impement this replacement in flow file records!
Created 07-11-2018 01:21 PM
It is usually not recommended to store large values in attributes as they are kept in memory which can cause issues for the entire flow. Can you share an example JSON and what you're trying to get as a result? You might be able to use UpdateRecord to create the new fields in-place (i.e. in the flow file contents) rather than having to extract fields into attributes.
Created 07-12-2018 04:25 AM
Hi, @Matt Burgess!
My sample formatted JSON file content (in original flow file JSON objects are separated by new line)
{ "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema", "schemaName": "CpmCustomerChangeEvent", "schemaVersion": "5.1.1", "eventHeader": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": { "eventCreationTime": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": { "timestamp": { "string": "2018-04-03T23:08:38.652+03:00" }, "timeZoneType": { "string": "SYSTEM_TIME_ZONE" }, "zoneName": { "string": "Europe/Kiev" } } }, "cpmInstanceHost": { "string": "env6-cpm1.dbss.bis.ua" }, "recordUniqueId": { "string": "77ECA557BFA74C98B2792222C9C72CED" } } }, "customerInformation": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": { "customerId": { "string": "5A834B1C27DE4FAF9F038B370AA3DDA4" }, "partyId": null } }, "genericInterfaceParameters": null, "requestInfo": null, "partyChangeResult": null }{ "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema", "schemaName": "EventDataEnrichment", "schemaVersion": "1.0.0", "enrichedData": [ ] } { "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema", "schemaName": "CpmCustomerChangeEvent", "schemaVersion": "5.1.1", "eventHeader": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": { "eventCreationTime": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": { "timestamp": { "string": "2018-04-03T23:08:39.652+03:10" }, "timeZoneType": { "string": "SYSTEM_TIME_ZONE" }, "zoneName": { "string": "Europe/Kiev" } } }, "cpmInstanceHost": { "string": "env6-cpm1.dbss.bis.ua" }, "recordUniqueId": { "string": "72DEA157BFA74C98B2792222C0C11CBE" } } }, "customerInformation": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": { "customerId": { "string": "1E2234B1C27DE4FAF9F038B370AA3DBE4" }, "partyId": null } }, "genericInterfaceParameters": null, "requestInfo": null, "partyChangeResult": null } { "schemaNameSpace": "CPMCDM.com.bis.bss.cpm.event.schema", "schemaName": "CpmCustomerChangeEvent", "schemaVersion": "5.1.1", "eventHeader": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.EventHeader": { "eventCreationTime": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.Time": { "timestamp": { "string": "2018-04-03T23:08:40.652+02:20" }, "timeZoneType": { "string": "SYSTEM_TIME_ZONE" }, "zoneName": { "string": "Europe/Kiev" } } }, "cpmInstanceHost": { "string": "env6-cpm1.dbss.bis.ua" }, "recordUniqueId": { "string": "55CBA557BFA74C98B2792222C9A11CDE" } } }, "customerInformation": { "CPMCDM.com.bis.bss.cpm.event.schema.cpmCustomerChangeEvent.CustomerInformation": { "customerId": { "string": "1E244B1C27DE4FAF9F038B370AA3DDD5" }, "partyId": null } }, "genericInterfaceParameters": null, "requestInfo": null, "partyChangeResult": null }{ "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema", "schemaName": "EventDataEnrichment", "schemaVersion": "1.0.0", "enrichedData": [ ] } <br>
Important thing to notice here is that some of the JSON objects in the flow file contain extention seperated with ESC (\x1b).
{ "schemaNameSpace": "com.bis.bss.edm.eventDataEnrichment.schema", "schemaName": "EventDataEnrichment", "schemaVersion": "1.0.0", "enrichedData": [ ] }<br>
Schema looks like below:
{mainJSON}{extentionJSON}
{mainJSON}
{mainJSON}{extentionJSON} .....
In the output I would like to have the following format:
mainJSON | extentionJSON
mainJSON |
mainJSON | extentionJSON
etc...
Thank you!
Created 07-12-2018 05:49 AM
So it is needed to replace ESC (\x1b) with delimiter: '|' using UpdateRecord processor.
Could you please help to configure this processor to impement this replacement in flow file records!
Created 07-12-2018 11:23 PM
For that case, if you just need to replace the ESC with | then use ReplaceText with Line-by-Line strategy (with either Regex Replace or Literal Replace, one or the either or both should work) to replace \x1b with |
Created 07-12-2018 11:25 PM
Note that since your format is not JSON nor JSON-per-line, you will have to do further processing before using any processors (record-based or not) that handle JSON. As of NiFi 1.7.0 (via NIFI-4456) the JsonTreeReader (and writer) allow for JSON-per-line, but your format is not exactly that either. If the existing processors or controller services (i.e. readers/writers) don't work, you might have to resort to a ScriptedRecordReader/Writer or a scripting processor to do custom handling.
Created 07-15-2018 04:38 PM
If you use large attributes, you will have serious issue with the "snapshot" file in the flow content repository. I've just killed my PROD this way last week : the snapshot was too big too fit in memory at startup : my data was lost.