- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Transform JSON array coming from ConsumeMQTT to key value pair for ingestion into influxDB
- Labels:
-
Apache NiFi
Created ‎06-20-2022 02:04 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Sensor data coming from ConsumeMQTT processor looks like this
{1,1,1,3465689,B4:E6:2D:34:E1:D9,BE:FB:E4:11:91:5A,-42,192.168.0.21,v0.16,15.88,16.35,46.59,95931.16,0,0,0,3.27,0,0,0,0}
Expected Output:
{
"val 1" : 1,
"val 2" : 1,
.......}
and so on
I'm preparing this incoming sensor data to be ingested in influxdb kindly help me out which processor is best for this and required configuration
Thanks a lot
Created ‎06-24-2022 10:44 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@HTalha ,
Another way to do this is to use the ExecuteScript processor with the following python script:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS)
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created on ‎06-20-2022 06:06 AM - edited ‎06-20-2022 06:07 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
The string you are getting is not a valid json, so you need to use ExecuteScript Processor to parse the string and convert into array and then convert the array into json array. You can convert this string into array by removing curly brackets and then do split(',') then you can convert the array into json using something like:
https://www.javaguides.net/2019/07/convert-set-to-json-array-using-jackson.html
Pass the converted Json Array as new flowfile from the ExecuteScript Processor. Lets assume that your json array looks like this:
["1","1","1","3465689","B4:E6:2D:34:E1:D9"...]
Then you use Json Jolt Processor and pass the following Jolt Spec:
[
{
"operation": "shift",
"spec": {
"*": {
"@0": "Value-&0"
}
}
}
]
which will give the new flowfile as follows:
{
"Value-0" : "1",
"Value-1" : "1",
"Value-2" : "1",
"Value-3" : "3465689",
"Value-4" : "B4:E6:2D:34:E1:D9"
...
}
Hope that helps. If it does please accept solution.
Thanks
Samer
Created ‎06-24-2022 10:44 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@HTalha ,
Another way to do this is to use the ExecuteScript processor with the following python script:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS)
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
