Support Questions

Find answers, ask questions, and share your expertise

Transform JSON array coming from ConsumeMQTT to key value pair for ingestion into influxDB

avatar
New Contributor

Sensor data coming from ConsumeMQTT processor looks like this

{1,1,1,3465689,B4:E6:2D:34:E1:D9,BE:FB:E4:11:91:5A,-42,192.168.0.21,v0.16,15.88,16.35,46.59,95931.16,0,0,0,3.27,0,0,0,0}

Expected Output:

 {

    "val 1" : 1,

    "val 2" : 1,

.......}

and so  on 

I'm preparing this incoming sensor data to be ingested in influxdb kindly help me out which processor is best for this and required configuration 

Thanks a lot

1 ACCEPTED SOLUTION

avatar
Super Guru

@HTalha ,

 

Another way to do this is to use the ExecuteScript processor with the following python script:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re

class SplitAndConvertToJson(StreamCallback):
  def __init__(self):
      pass

  def process(self, inputStream, outputStream):
      input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
      fields = input.split(',')
      obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
      outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))

flowfile = session.get()
if(flowfile != None):
    flowfile = session.write(flowfile, SplitAndConvertToJson())
    session.transfer(flowfile, REL_SUCCESS)

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

2 REPLIES 2

avatar
Super Guru

Hi,

The string you are getting is not a valid json, so you need to use ExecuteScript Processor to parse the string and convert into array and then convert the array into json array. You can convert this string into array by removing curly brackets and then do split(',') then you can convert the array into json using something like:

https://www.javaguides.net/2019/07/convert-set-to-json-array-using-jackson.html

Pass the converted Json Array as new flowfile from the ExecuteScript Processor. Lets assume that your json array looks like this:

 

["1","1","1","3465689","B4:E6:2D:34:E1:D9"...]

 

Then you use Json Jolt Processor and pass the following Jolt Spec:

 

[
 {
   "operation": "shift",
      "spec": {
             "*": {
                   "@0": "Value-&0"
                 }
              }
       }
]

 

which will give the new flowfile as follows:

 

{
"Value-0" : "1",
"Value-1" : "1",
"Value-2" : "1",
"Value-3" : "3465689",
"Value-4" : "B4:E6:2D:34:E1:D9"

...
}

 

Hope that helps. If it does please accept solution.

Thanks

Samer

avatar
Super Guru

@HTalha ,

 

Another way to do this is to use the ExecuteScript processor with the following python script:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re

class SplitAndConvertToJson(StreamCallback):
  def __init__(self):
      pass

  def process(self, inputStream, outputStream):
      input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
      fields = input.split(',')
      obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
      outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))

flowfile = session.get()
if(flowfile != None):
    flowfile = session.write(flowfile, SplitAndConvertToJson())
    session.transfer(flowfile, REL_SUCCESS)

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.