Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Processor for replacing JSON values dynamically and generically

avatar
Contributor

Hello there,

Is there a simpler and dynamic way to replace JSON values dynamically than ReplaceText. Currently, below is my JSON and respective ReplaceText properties (PFA).

{

"orgId": "" "sourceId": "123" "eventDate", "31-12-2016", "eventDetail": "Transaction complete." "ingestionDate": "", }

I have to add today's date dynamically to ingestionDate below, which I am somehow being able to achieve with ReplaceText, however I believe its at best fragile since it appends it at the end of the JSON.

Search Value - (?s:(^.*)}$) Replacement Value - $1,"ingestionDate":"${ingestionDate}"}

However, what if I needed to dynamically assign more values for various other attributes for a complex JSON (I do have another use case for that), and so I need a way to be able to refer to the incoming flow-file JSON attribute via regex (like ingestionDate) and update its value to current date more generically rather than manipulating it as a text. Is there a better way out? Please advice.

Thank you and Happy New Year to all!

replacetext.pngreplacetext.png

1 ACCEPTED SOLUTION

avatar
Master Guru

If you know the full set of fields in the JSON objects (and there's not a prohibitively large number of them), you could use EvaluateJsonPath to extract each field and its value into attributes, then UpdateAttribute to update ingestionDate with the current time (in NiFi Expression Language, you can use ${now()} ), then AttributesToJSON or ReplaceText to output an updated JSON object.

If the JSON object is sufficiently large or complex, you might consider using ExecuteScript with a language that handles JSON objects easily, such as Groovy, Jython, or Javascript. Here is an example of a Groovy script to update the ingestionDate field with today's date (and update the "filename" attribute by appending _translated.json to it):

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.*

def flowFile = session.get()
if (!flowFile) return

try {
  flowFile = session.write(flowFile,
      { inputStream, outputStream ->
          def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          def obj = new JsonSlurper().parseText(text)

          // Update ingestionDate field with today's date
          obj.ingestionDate = new Date().format( 'dd-MM-yyyy' )

          // Output updated JSON
          def json = JsonOutput.toJson(obj)
          outputStream.write(JsonOutput.prettyPrint(json).getBytes(StandardCharsets.UTF_8))
      } as StreamCallback)
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

} catch(Exception e) {
  log.error('Error during JSON operations', e)
  session.transfer(flowFile, REL_FAILURE)
}

View solution in original post

9 REPLIES 9

avatar
Master Guru

If you know the full set of fields in the JSON objects (and there's not a prohibitively large number of them), you could use EvaluateJsonPath to extract each field and its value into attributes, then UpdateAttribute to update ingestionDate with the current time (in NiFi Expression Language, you can use ${now()} ), then AttributesToJSON or ReplaceText to output an updated JSON object.

If the JSON object is sufficiently large or complex, you might consider using ExecuteScript with a language that handles JSON objects easily, such as Groovy, Jython, or Javascript. Here is an example of a Groovy script to update the ingestionDate field with today's date (and update the "filename" attribute by appending _translated.json to it):

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.*

def flowFile = session.get()
if (!flowFile) return

try {
  flowFile = session.write(flowFile,
      { inputStream, outputStream ->
          def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          def obj = new JsonSlurper().parseText(text)

          // Update ingestionDate field with today's date
          obj.ingestionDate = new Date().format( 'dd-MM-yyyy' )

          // Output updated JSON
          def json = JsonOutput.toJson(obj)
          outputStream.write(JsonOutput.prettyPrint(json).getBytes(StandardCharsets.UTF_8))
      } as StreamCallback)
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

} catch(Exception e) {
  log.error('Error during JSON operations', e)
  session.transfer(flowFile, REL_FAILURE)
}

avatar
Contributor

Thanks Matt for the quick response. I am already using ReplaceText with now{}, however I tried AttributesToJSON but somehow the attribute is not getting updated in the output JSON. Below is what I am trying along with the NiFi screenshot, kindly advice what I might be missing here -

Input JSON - {"eventDetail":{...},"sourceId":14627000,"eventDate":"2016-11-01T00:00:00","ingestionDate":""}

Output JSON - {"eventDetail":{...},"sourceId":14627000,"eventDate":"2016-11-01T00:00:00","ingestionDate":""}

PFA screenshots.

10980-evaluatejsonpath.png

10981-updateattribute.png

10982-attributestojson.png

avatar
Master Guru

Your AttributesToJSON processor is configured to put the output JSON into an attribute called JSONAttributes (because "Destination" is set to "flowfile-attribute". If instead you want the JSON in the content, set Destination to flowfile-content instead.

avatar
Contributor

Thank you Matt, I was finally able to generate the JSON as well as the dynamically injected 'ingestionDate'. However the output JSON is all escaped with \" for the strings keys/values, is there a way to strip of those or not have it in the first place?

Below is the snippet of JSON and screenshots of the modified processors.

Output JSON -

{"items":"[{\"date\":\"2016-11-01T00:00:00\",\"pointsMax\":100,\"respPeriod\":\"2016-11\"},\"details\":[{\"detail\":[{\"meta\":{\"answers\":[{\"text\":\"11/1/2016 \"}]},\"id\":56184,... ... }}]}]","ingestionDate":"Tue Jan 03 18:17:15 IST 2017"}

EvaluateJSONPath 11036-evaluatejsonpath.png

UpdateAttribute11038-updateattribute.png

AttributesToJSON11039-attributestojson.png


updateattribute.png

avatar
Contributor

Something doesn't look right to me or some of the parameters may have been left out. I see you are manipulating one attribute (ingestionDate), yet all the attributes are being returned in your input/ output examples.

The way we do this to consider the attributes and data payload as distinct. The EvaluateJsonPath moves the data to attributes and the AttributesToJson moves the attributes to data.

In this way, when EvaluatingJsonPath, you can leave ingestionDate out since you do not need it. What you need are the other Json values so they move into the attributes. Then UpdateAttributes by adding the ingestionDate with the Now(). Then AttributesToJson to move all the atrtibutes including the new ingestionDate into the data.

It wouldn't surprise me if internally you don't either have two ingestionDate attributes or your code is ignored to prevent that situation.

Either way, the way I described we think is more maintainable you can see exactly what is going in and out of attributes to data it each step and not have to look at a bunch of transformations.

avatar
Contributor

Thank you Don for the simplified explaination, I figured looking at the source code for these eventually.

avatar
Contributor

Hi Matt

Do you have any code that is using avro instead of json to manipulate the data?

/Max

avatar
Explorer

Hi @Matt Burgess

I am beginner to Nifi and facing one issue with PutSQL.

Scenario:

I have two tables say TableA and TableB.In both table I have column 'date' but column sequence is not same.I am trying to apply format for 'date' column for TableA using ${sql.args.4.value:format("yyyy-MM-dd HH:mm:ss.SSS")} but as TableB has 4th column some text value not 'date' so it is failing for TableB.

If I will try to use both ${sql.args.4.value:format("yyyy-MM-dd HH:mm:ss.SSS")} for TableA and ${sql.args.5.value:format("yyyy-MM-dd HH:mm:ss.SSS")} for TableB then it is not working as both tables has different column sequence for 'date'.

Is there any way to handle this format dynamically by using column name?

Thanks

avatar
Master Guru

Since this isn't related to the original question, please ask this as its own standalone question and I'd be happy to answer it. (The short answer is you might be able to use UpdateAttribute to change the 4 to the right column number for Table B if you can figure out whether a flow file is for Table A or B)