Created 09-25-2017 04:36 AM
Hi,
I am trying to insert the following json format to SQL. Since they are not in the same level so it can be challenged. Any thoughts please?
Thanks,
SJ
Created on 09-26-2017 01:55 AM - edited 08-17-2019 10:45 PM
Hi @Sanaz Janbakhsh
Prepare
INSERT/UPDATE statements:-
As you are having array inside your json message,if you have array in your json file then you need to flatten out json message before ConvertJSONToSQL processor as it expects flat json message to prepare INSERT/UPDATE statements.I tried with your data There are 2 ways to prepare INSERT/UPDATE statements for PutSQL processor.
Method1:- Using JSONToSQL to prepare INSERT/UPDATE:-
Input:-
{ "applicationID": "9", "applicationName": "Sensors", "nodeName": "XXXX", "devEUI": "XX", "rxInfo": [ { "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": XXX, "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 } ], "txInfo": { "frequency": 902300000, "dataRate": { "modulation": "LORA", "bandwidth": 125, "spreadFactor": 10 }, "adr": true, "codeRate": "4/5" }, "fCnt": 7, "fPort": 1, "data": "IA==" }
Flattening Base Level Json data:-
First we need to flatten out the base fields which are not in Array.
To flatten Json message in nifi we are having JoltTransformJSON processor which will do flattening the json file.
You can test out flattening json on http://jolt-demo.appspot.com
Use the Jolt Specification below to flatten the base fields:-
[ { "operation": "shift", "spec": { "*": "&", "txInfo": { "*": "tI-&", "dataRate": { "*": "dR&" } } } } ]
Output:-
{ "applicationID": "9", "applicationName": "Sensors", "nodeName": "XXXX", "devEUI": "XX", "rxInfo": [{ "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": "XXX", "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 }], "tI-frequency": 902300000,--there is tI before frequency because in jolt i have given "*": "tI-&" "dRmodulation": "LORA",--same with dR also i have given "*": "dR&" "dRbandwidth": 125, "dRspreadFactor": 10, "tI-adr": true, "tI-codeRate": "4/5", "fCnt": 7, "fPort": 1, "data": "IA==" }
In this step we have flatten all the base fields in json message except array.
jolt configs:-
Extract Attributes for Flatten Json Fileds using EvaluateJsonPath:-
In this step use the Destination property as flowfile-attribute so that we can extract the content of the json message to attributes.
Give all the attribute names which are not part of array
"rxInfo": [{ "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": "XXX", "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 }]
For all the other data in json message add properties to the by clicking + symbol on top right
Add properties like below
applicationID as $. applicationID
Do the same thing for all the other attributes except for array elements.
EvaluateJson configs:-
Split Array using SplitJson Processor:-
We are having array which we haven’t transformed yet so use splitjson processor and property for JsonPath Expression as $.rxInfo. It will splits the json array and flatten json message we are having as a flowfile now.
Once we got flatten json message extract all the values of message to attributes by using evaluate json path processor as we did before.
AttributesToJSON Processor:
For every flowfile we are having all your json message data as attributes now so use Attributestojson processor to get all the attributes into json message as this processor will result flatten json message.
In AttributesList property give all the list of attributes that you are thinking to make them as json message.
nodeName,fPort,data,fCnt,devEUI,ri_altitude,tI-frequency,applicationID,applicationName,ri_loRaSNR,ri_time,ri_name,dRbandwidth,ri_latitude,dRmodulation,dRspreadFactor,tI-codeRate,tI-adr,ri_mac,ri_rssi,ri_longitude
Output:-
{ "nodeName": "XXXX", "fPort": "1", "data": "IA==", "fCnt": "7", "ri_loRaSNR": "XXX", "devEUI": "XX", "ri_time": "2017-07-19T21:39:35Z", "ri_name": "SaroshGateway", "dRbandwidth": "125", "ri_latitude": "51.0216123", "dRmodulation": "LORA", "dRspreadFactor": "10", "ri_altitude": "0", "tI-frequency": "902300000", "tI-codeRate": "4/5", "tI-adr": "true", "ri_mac": "647fdafffe00416b", "applicationID": "9", "ri_rssi": "-1", "applicationName": "Sensors", "ri_longitude": "-114.039802" }
ConvertJSONToSQL processor:-
Right now we are having flatten json message we can make use JSONToSQL processor to prepare INSERT/UPDATE statements, as keep in mind I have renamed the array and some other message data you can change the attribute value in evaluate json path (or) in jolt transform processors.
Method2:- Replace text to prepare INSERT/UPDATE statements
As my flow screenshot shows Use
Replace text processor after you got all the message contents as flowfile attributes(i.e after Extract Attributes for Json Array data) processor
Replace text Processor Configs:-
We are have all the contents of json message to attributes for each flow file so use those attributes inside replace text processor and change the following properties
Replacement Value property to
INSERT INTO table_name (nodeName,fPort,....) VALUES (${nodeName},${fPort},....)
Here we are preparing insert sql statement and replacing with attribute values as ${nodeName} returns the value associated with the flowfile.
Output:-
INSERT INTO table_name (nodeName,fPort,....) VALUES (XXXX, 1,....)
ReplaceText Config:-
Right now we have prepared insert and update statements and replaced all the attributes with values associated with the ff.
PutSQL:-
From both methods will prepare INSERT/UPDATE statements then use PutSQL processor and Configure the connection pool and auto terminate success message.
Flow screenshot:-
Take reference as my screenshot and make sure you have given Correct Relations(like success,split,matched...) to the next processors.
Hope this helps..!
Created 09-25-2017 08:26 AM
Here is a community answer on how to flatten your JSON data: https://community.hortonworks.com/questions/54314/can-we-flatten-complex-json-file-using-nifi.html.
Created 09-25-2017 06:21 PM
Check out HW Data Flow(Nifi). I believe it has some processors which can convert JSON object to flow-file and flow-file to SQL(I guess you want to insert into Hive).
Processor: ConvertJSONToSQL
Description: https://nifi.apache.org/docs.html
Created on 09-26-2017 01:55 AM - edited 08-17-2019 10:45 PM
Hi @Sanaz Janbakhsh
Prepare
INSERT/UPDATE statements:-
As you are having array inside your json message,if you have array in your json file then you need to flatten out json message before ConvertJSONToSQL processor as it expects flat json message to prepare INSERT/UPDATE statements.I tried with your data There are 2 ways to prepare INSERT/UPDATE statements for PutSQL processor.
Method1:- Using JSONToSQL to prepare INSERT/UPDATE:-
Input:-
{ "applicationID": "9", "applicationName": "Sensors", "nodeName": "XXXX", "devEUI": "XX", "rxInfo": [ { "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": XXX, "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 } ], "txInfo": { "frequency": 902300000, "dataRate": { "modulation": "LORA", "bandwidth": 125, "spreadFactor": 10 }, "adr": true, "codeRate": "4/5" }, "fCnt": 7, "fPort": 1, "data": "IA==" }
Flattening Base Level Json data:-
First we need to flatten out the base fields which are not in Array.
To flatten Json message in nifi we are having JoltTransformJSON processor which will do flattening the json file.
You can test out flattening json on http://jolt-demo.appspot.com
Use the Jolt Specification below to flatten the base fields:-
[ { "operation": "shift", "spec": { "*": "&", "txInfo": { "*": "tI-&", "dataRate": { "*": "dR&" } } } } ]
Output:-
{ "applicationID": "9", "applicationName": "Sensors", "nodeName": "XXXX", "devEUI": "XX", "rxInfo": [{ "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": "XXX", "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 }], "tI-frequency": 902300000,--there is tI before frequency because in jolt i have given "*": "tI-&" "dRmodulation": "LORA",--same with dR also i have given "*": "dR&" "dRbandwidth": 125, "dRspreadFactor": 10, "tI-adr": true, "tI-codeRate": "4/5", "fCnt": 7, "fPort": 1, "data": "IA==" }
In this step we have flatten all the base fields in json message except array.
jolt configs:-
Extract Attributes for Flatten Json Fileds using EvaluateJsonPath:-
In this step use the Destination property as flowfile-attribute so that we can extract the content of the json message to attributes.
Give all the attribute names which are not part of array
"rxInfo": [{ "mac": "647fdafffe00416b", "time": "2017-07-19T21:39:35Z", "rssi": -1, "loRaSNR": "XXX", "name": "SaroshGateway", "latitude": 51.0216123, "longitude": -114.039802, "altitude": 0 }]
For all the other data in json message add properties to the by clicking + symbol on top right
Add properties like below
applicationID as $. applicationID
Do the same thing for all the other attributes except for array elements.
EvaluateJson configs:-
Split Array using SplitJson Processor:-
We are having array which we haven’t transformed yet so use splitjson processor and property for JsonPath Expression as $.rxInfo. It will splits the json array and flatten json message we are having as a flowfile now.
Once we got flatten json message extract all the values of message to attributes by using evaluate json path processor as we did before.
AttributesToJSON Processor:
For every flowfile we are having all your json message data as attributes now so use Attributestojson processor to get all the attributes into json message as this processor will result flatten json message.
In AttributesList property give all the list of attributes that you are thinking to make them as json message.
nodeName,fPort,data,fCnt,devEUI,ri_altitude,tI-frequency,applicationID,applicationName,ri_loRaSNR,ri_time,ri_name,dRbandwidth,ri_latitude,dRmodulation,dRspreadFactor,tI-codeRate,tI-adr,ri_mac,ri_rssi,ri_longitude
Output:-
{ "nodeName": "XXXX", "fPort": "1", "data": "IA==", "fCnt": "7", "ri_loRaSNR": "XXX", "devEUI": "XX", "ri_time": "2017-07-19T21:39:35Z", "ri_name": "SaroshGateway", "dRbandwidth": "125", "ri_latitude": "51.0216123", "dRmodulation": "LORA", "dRspreadFactor": "10", "ri_altitude": "0", "tI-frequency": "902300000", "tI-codeRate": "4/5", "tI-adr": "true", "ri_mac": "647fdafffe00416b", "applicationID": "9", "ri_rssi": "-1", "applicationName": "Sensors", "ri_longitude": "-114.039802" }
ConvertJSONToSQL processor:-
Right now we are having flatten json message we can make use JSONToSQL processor to prepare INSERT/UPDATE statements, as keep in mind I have renamed the array and some other message data you can change the attribute value in evaluate json path (or) in jolt transform processors.
Method2:- Replace text to prepare INSERT/UPDATE statements
As my flow screenshot shows Use
Replace text processor after you got all the message contents as flowfile attributes(i.e after Extract Attributes for Json Array data) processor
Replace text Processor Configs:-
We are have all the contents of json message to attributes for each flow file so use those attributes inside replace text processor and change the following properties
Replacement Value property to
INSERT INTO table_name (nodeName,fPort,....) VALUES (${nodeName},${fPort},....)
Here we are preparing insert sql statement and replacing with attribute values as ${nodeName} returns the value associated with the flowfile.
Output:-
INSERT INTO table_name (nodeName,fPort,....) VALUES (XXXX, 1,....)
ReplaceText Config:-
Right now we have prepared insert and update statements and replaced all the attributes with values associated with the ff.
PutSQL:-
From both methods will prepare INSERT/UPDATE statements then use PutSQL processor and Configure the connection pool and auto terminate success message.
Flow screenshot:-
Take reference as my screenshot and make sure you have given Correct Relations(like success,split,matched...) to the next processors.
Hope this helps..!
Created 09-26-2017 10:27 PM
That's really great help. I followed your instructions and could flattened the Json format. Just have a quick question, since we are evaluate the message in two steps (arrays and not array one) how can I merge them together to make sure the full falttened Json message is created.
Created on 09-27-2017 12:48 AM - edited 08-17-2019 10:44 PM
Hi @Sanaz Janbakhsh,
Sure, if you are following Method1 then use AttributesToJson processor as we are having all the json message data as attributes of the flowfile.
In AttributesList property give all the list of attributes that you are thinking to make them as json message.
AttributesToJson processor configs:-
once you have given all the required attributes in Attributes List property then the success relation of the processor to ConvertJSONToSQL processor.
The output of AttributesToJson should be a flatten json message as this processor converts all the attributes that are listed to a flatten json message.
Once we got flatten json message then ConvertJSONtoSQL processor will prepare INSERT/UPDATE statements to it.
Output:-
{
"nodeName":"XXXX",
"fPort":"1",
"data":"IA==",
"fCnt":"7",
"ri_loRaSNR":"XXX",
"devEUI":"XX",
"ri_time":"2017-07-19T21:39:35Z",
"ri_name":"SaroshGateway",
"dRbandwidth":"125",
"ri_latitude":"51.0216123",
"dRmodulation":"LORA",
"dRspreadFactor":"10",
"ri_altitude":"0",
"tI-frequency":"902300000",
"tI-codeRate":"4/5",
"tI-adr":"true",
"ri_mac":"647fdafffe00416b",
"applicationID":"9",
"ri_rssi":"-1",
"applicationName":"Sensors",
"ri_longitude":"-114.039802"
}
*validate your attributes to json processor output with above output message.
If you are following Method2:-
let's have a look on replace text processor configs, as i attached screenshots in the above answer to prepare INSERT/UPDATE statements and replacing all the attribute values in sql statements.
Created 09-27-2017 04:04 AM
Hi Shu,
I followed your advice. Please find the screenshot. On the green arrow I can get the whole message which is not flattened.. On the red one I get only the array one that I split and not all.
I add the following in the "attrbuteToJson" processor.
applicationID,applicationName,nodeName,devEUI,mac,time,rssi,loRaSNR,name,latitude,longitude,altitude,tI-frequency,dRmodulation,dRbandwidth,dRspreadFactor,tI-adr,tI-codeRate,fCnt,fPort,data
Any thoughts please?Created on 09-28-2017 11:06 PM - edited 08-17-2019 10:44 PM
Hi @Sanaz Janbakhsh,
for sure this behaviour is because of your Destination property in AttributestoJson processor default will be flowfile-attribute
but we need to change that property to
flowfile-content
Attributestojson configs Screnshot:-
change the below highlighted property you are using flowfile-attribute that means this processor keeps all the list of attributes to ff attributes
but we need all the attributes needs to be in flowfile-content.
Hope this Helps....!!
Created 11-15-2018 11:29 AM
Could you please share this templates ?