Created 11-08-2018 09:04 AM
i have one json data model like:-
{
"Details": {
"EntityType": {
"Type": "string",
"Value": ""
},
"EntityName": {
"Type": "string",
"Value": ""
},
"EntityId": {
"Type": "string",
"Value": ""
}}
and i have one json that is key value pair like
{
"EntityType":"person",
"EntityName":"Ankit",
"EntityId":"11"
}
i want to do my key value json mapping into datamodel.
Desired output:-
{ "Details": { "EntityType": { "Type": "string", "Value": "person" }, "EntityName": { "Type": "string", "Value": "Ankit" }, "EntityId": { "Type": "string", "Value": "11" }}
if anybody knows about it please share your knowledge
Created 11-08-2018 12:46 PM
You need to use LookUpRecord processor in NiFi for this case.
For more details regards to usage/configurations of LookUpRecord processors refer to this link.
Created on 11-10-2018 08:35 PM - edited 08-17-2019 04:47 PM
I have done by using executescript processor
first I have taken data model
{ "EntityType": { "Type": "string", "Value": "" }, "EntityName": { "Type": "string", "Value": "" }, "EntityId": { "Type": "string", "Value":"" }
inside execute script processor i have written logic for binding our data {"EntityType":"person","EntityName":"Ankit","EntityId":"11"} into this datamodel
i am using ECMAScript for this task
script :-
flowFile = session.get();
if (flowFile != null)
{
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream)
{ var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
var Productiondata = JSON.parse(text) //this is data like key value pair
String Datamodel = flowfile.getAttribute("Datamodel"); //taking datamodel from variable
for (var defaultData in defaultJsonStrObj) //after that iterating and binding it
{
defaultJsonStrObj[defaultData]["Value"]=Productiondata[defaultData]
}
outputStream.write(JSON.stringify(defaultJsonStrObj, null, '\t').getBytes(StandardCharsets.UTF_8))
} }))
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS)
}