Created 06-20-2022 02:04 AM
Sensor data coming from ConsumeMQTT processor looks like this
{1,1,1,3465689,B4:E6:2D:34:E1:D9,BE:FB:E4:11:91:5A,-42,192.168.0.21,v0.16,15.88,16.35,46.59,95931.16,0,0,0,3.27,0,0,0,0}
Expected Output:
{
"val 1" : 1,
"val 2" : 1,
.......}
and so on
I'm preparing this incoming sensor data to be ingested in influxdb kindly help me out which processor is best for this and required configuration
Thanks a lot
Created 06-24-2022 10:44 PM
@HTalha ,
Another way to do this is to use the ExecuteScript processor with the following python script:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS)
Cheers,
André
Created on 06-20-2022 06:06 AM - edited 06-20-2022 06:07 AM
Hi,
The string you are getting is not a valid json, so you need to use ExecuteScript Processor to parse the string and convert into array and then convert the array into json array. You can convert this string into array by removing curly brackets and then do split(',') then you can convert the array into json using something like:
https://www.javaguides.net/2019/07/convert-set-to-json-array-using-jackson.html
Pass the converted Json Array as new flowfile from the ExecuteScript Processor. Lets assume that your json array looks like this:
["1","1","1","3465689","B4:E6:2D:34:E1:D9"...]
Then you use Json Jolt Processor and pass the following Jolt Spec:
[
{
"operation": "shift",
"spec": {
"*": {
"@0": "Value-&0"
}
}
}
]
which will give the new flowfile as follows:
{
"Value-0" : "1",
"Value-1" : "1",
"Value-2" : "1",
"Value-3" : "3465689",
"Value-4" : "B4:E6:2D:34:E1:D9"
...
}
Hope that helps. If it does please accept solution.
Thanks
Samer
Created 06-24-2022 10:44 PM
@HTalha ,
Another way to do this is to use the ExecuteScript processor with the following python script:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS)
Cheers,
André