Created 12-31-2016 06:12 AM
Hello there,
Is there a simpler and dynamic way to replace JSON values dynamically than ReplaceText. Currently, below is my JSON and respective ReplaceText properties (PFA).
{
"orgId": "" "sourceId": "123" "eventDate", "31-12-2016", "eventDetail": "Transaction complete." "ingestionDate": "", }
I have to add today's date dynamically to ingestionDate below, which I am somehow being able to achieve with ReplaceText, however I believe its at best fragile since it appends it at the end of the JSON.
Search Value - (?s:(^.*)}$) Replacement Value - $1,"ingestionDate":"${ingestionDate}"}
However, what if I needed to dynamically assign more values for various other attributes for a complex JSON (I do have another use case for that), and so I need a way to be able to refer to the incoming flow-file JSON attribute via regex (like ingestionDate) and update its value to current date more generically rather than manipulating it as a text. Is there a better way out? Please advice.
Thank you and Happy New Year to all!
Created 12-31-2016 03:57 PM
If you know the full set of fields in the JSON objects (and there's not a prohibitively large number of them), you could use EvaluateJsonPath to extract each field and its value into attributes, then UpdateAttribute to update ingestionDate with the current time (in NiFi Expression Language, you can use ${now()} ), then AttributesToJSON or ReplaceText to output an updated JSON object.
If the JSON object is sufficiently large or complex, you might consider using ExecuteScript with a language that handles JSON objects easily, such as Groovy, Jython, or Javascript. Here is an example of a Groovy script to update the ingestionDate field with today's date (and update the "filename" attribute by appending _translated.json to it):
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets import groovy.json.* def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def obj = new JsonSlurper().parseText(text) // Update ingestionDate field with today's date obj.ingestionDate = new Date().format( 'dd-MM-yyyy' ) // Output updated JSON def json = JsonOutput.toJson(obj) outputStream.write(JsonOutput.prettyPrint(json).getBytes(StandardCharsets.UTF_8)) } as StreamCallback) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error('Error during JSON operations', e) session.transfer(flowFile, REL_FAILURE) }
Created 12-31-2016 03:57 PM
If you know the full set of fields in the JSON objects (and there's not a prohibitively large number of them), you could use EvaluateJsonPath to extract each field and its value into attributes, then UpdateAttribute to update ingestionDate with the current time (in NiFi Expression Language, you can use ${now()} ), then AttributesToJSON or ReplaceText to output an updated JSON object.
If the JSON object is sufficiently large or complex, you might consider using ExecuteScript with a language that handles JSON objects easily, such as Groovy, Jython, or Javascript. Here is an example of a Groovy script to update the ingestionDate field with today's date (and update the "filename" attribute by appending _translated.json to it):
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets import groovy.json.* def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def obj = new JsonSlurper().parseText(text) // Update ingestionDate field with today's date obj.ingestionDate = new Date().format( 'dd-MM-yyyy' ) // Output updated JSON def json = JsonOutput.toJson(obj) outputStream.write(JsonOutput.prettyPrint(json).getBytes(StandardCharsets.UTF_8)) } as StreamCallback) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error('Error during JSON operations', e) session.transfer(flowFile, REL_FAILURE) }
Created on 01-01-2017 12:07 PM - edited 08-19-2019 04:53 AM
Thanks Matt for the quick response. I am already using ReplaceText with now{}, however I tried AttributesToJSON but somehow the attribute is not getting updated in the output JSON. Below is what I am trying along with the NiFi screenshot, kindly advice what I might be missing here -
Input JSON - {"eventDetail":{...},"sourceId":14627000,"eventDate":"2016-11-01T00:00:00","ingestionDate":""}
Output JSON - {"eventDetail":{...},"sourceId":14627000,"eventDate":"2016-11-01T00:00:00","ingestionDate":""}
PFA screenshots.
Created 01-02-2017 06:45 PM
Your AttributesToJSON processor is configured to put the output JSON into an attribute called JSONAttributes (because "Destination" is set to "flowfile-attribute". If instead you want the JSON in the content, set Destination to flowfile-content instead.
Created on 01-03-2017 01:21 PM - edited 08-19-2019 04:52 AM
Thank you Matt, I was finally able to generate the JSON as well as the dynamically injected 'ingestionDate'. However the output JSON is all escaped with \" for the strings keys/values, is there a way to strip of those or not have it in the first place?
Below is the snippet of JSON and screenshots of the modified processors.
Output JSON -
{"items":"[{\"date\":\"2016-11-01T00:00:00\",\"pointsMax\":100,\"respPeriod\":\"2016-11\"},\"details\":[{\"detail\":[{\"meta\":{\"answers\":[{\"text\":\"11/1/2016 \"}]},\"id\":56184,... ... }}]}]","ingestionDate":"Tue Jan 03 18:17:15 IST 2017"}
EvaluateJSONPath
UpdateAttribute
AttributesToJSON
Created 01-02-2017 02:34 AM
Something doesn't look right to me or some of the parameters may have been left out. I see you are manipulating one attribute (ingestionDate), yet all the attributes are being returned in your input/ output examples.
The way we do this to consider the attributes and data payload as distinct. The EvaluateJsonPath moves the data to attributes and the AttributesToJson moves the attributes to data.
In this way, when EvaluatingJsonPath, you can leave ingestionDate out since you do not need it. What you need are the other Json values so they move into the attributes. Then UpdateAttributes by adding the ingestionDate with the Now(). Then AttributesToJson to move all the atrtibutes including the new ingestionDate into the data.
It wouldn't surprise me if internally you don't either have two ingestionDate attributes or your code is ignored to prevent that situation.
Either way, the way I described we think is more maintainable you can see exactly what is going in and out of attributes to data it each step and not have to look at a bunch of transformations.
Created 01-03-2017 12:58 PM
Thank you Don for the simplified explaination, I figured looking at the source code for these eventually.
Created 10-03-2017 06:40 AM
Hi Matt
Do you have any code that is using avro instead of json to manipulate the data?
/Max
Created 04-28-2018 07:02 PM
I am beginner to Nifi and facing one issue with PutSQL.
Scenario:
I have two tables say TableA and TableB.In both table I have column 'date' but column sequence is not same.I am trying to apply format for 'date' column for TableA using ${sql.args.4.value:format("yyyy-MM-dd HH:mm:ss.SSS")} but as TableB has 4th column some text value not 'date' so it is failing for TableB.
If I will try to use both ${sql.args.4.value:format("yyyy-MM-dd HH:mm:ss.SSS")} for TableA and ${sql.args.5.value:format("yyyy-MM-dd HH:mm:ss.SSS")} for TableB then it is not working as both tables has different column sequence for 'date'.
Is there any way to handle this format dynamically by using column name?
Thanks
Created 05-15-2018 04:37 PM
Since this isn't related to the original question, please ask this as its own standalone question and I'd be happy to answer it. (The short answer is you might be able to use UpdateAttribute to change the 4 to the right column number for Table B if you can figure out whether a flow file is for Table A or B)