Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Modifying JSON file

avatar
Super Collaborator

Hi,

I have a JSON file coming in from google big query in the format below..(it looks like Google BQ only supports extracting as JSON with newline delimited ).

{“person”:”person1 value”}

{“person”:”person2 value”}

so i am not able to use that file as regular JSON file and not able to use any JSON processors (Evaluate,Split etc) ..how can i change it to a valid readable file (look below) and use it with other processors.

[{“person”:”person1 value”},

{“person”:”person2 value”}]

i do not have a schema for incoming JSON file.

Thanks,

Sai

6 REPLIES 6

avatar
Master Guru

@Saikrishna Tarapareddy

Step1:Split Text processor:-
Configure split text processor with

Line Split Count

1

(or)

Use Split Content processor with
Byte Sequence Format

Text

Byte Sequence

shift+enter

Use splits relation from the above processors feed to merge content processor.
Splits relation will have each line of your json file as a new flowfile content then you can use Merge Content processor to merge the json messages and make them as an array of json.

Step2:Merge Content Processor configs:-
Delimiter Strategy

Text

Header

[

Footer

]

Demarcator

,

68464-mergecontent.png

Configure the processor for your desired Minimum number of entries and use Max Bin age as wild card.

Please refer to below link for merge content processor configs.

https://community.hortonworks.com/questions/64337/apache-nifi-merge-content.html
https://community.hortonworks.com/questions/161827/mergeprocessor-nifi-using-the-correlation-attribu...
https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.ht...

avatar
Master Guru
@Saikrishna Tarapareddy

We can use series of Replace Text processors to make it a valid json and also we need to consider the file size that you are getting if size is big then you need to change maximum buffer size properties in replace text processor but this process will not be optimal, because each time replace text processor needs to keep the file in buffer then apply all the replacements that we configured in the processor.
Instead of this you can easily use SplitText processor then use merge content processor with defragment as merge strategy

(The 'Defragment' algorithm combines fragments(fragment id attribute is added by split text processor) that are associated by attributes back into a single cohesive FlowFile)
by using this strategy we are going have same json messages in valid json array. so that you can use all record based processors to parse this file, by following this way will be more optimal.

avatar

Since the data coming in is JSON concatenated together with newlines the first thing you can do is use SplitText to split on newlines. Then the resulting objects are JSON documents at which point in NiFi you can use all kinds of fun processors. The suggestion above to use MergeContent *I think* would head down the opposite path from what I believe the question was but perhaps I understood it incorrectly.

avatar
Master Guru

Thanks @jwitt, i missed the first step of splitting, i thought its new flowfile for each message.

avatar
Super Collaborator

@jwitt, @Shu,

Thanks for your input , I am actually following the similar approach. But was trying to see if there is any thing I can do at a file level instead of splitting it .

Regards,

Sai

avatar
Master Guru

I am working on NIFI-4456 which will allow the JSON reader/writer to support the "one JSON per line" format as well as the "JSON array" format for input and output, so you will be able to read in one JSON per line and output a JSON array, using ConvertRecord (or any other record-aware processor).

In the meantime, you can use the following crude script in an ExecuteGroovyScript processor to process your entire file (avoiding the Split/Merge pattern), it should get you what you want:

def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inStream, outStream ->
  outStream.write('['.bytes)
  inStream.eachLine { line, i ->
    if(i > 1) outStream.write(','.bytes)
    outStream.write(line.bytes)
  }
  outStream.write(']'.bytes)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)

The script just adds array brackets around the whole doc, and separates the lines by a comma. I did the crude version because it doesn't need to load the entire input content into memory. If you need more control over the JSON objects, you could iterate over the lines (still with eachLine), use JsonSlurper to deserialize each string into a JSON object, then add each object to an array, then use JsonOutput to serialize the whole thing back to a string. However that involves having the entire content in memory and could get unwieldy for large input flow files.