Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi JoltJsonTransform spec help needed

avatar
Explorer

Hi All,

Need help in JoltJSONTransform spec that can be used to convert the input to output.

I have tried to use map to List and other syntax, but was not been successful so far. Any help is appreciated.

Input:

{"params":"sn=GH6747246T4JLR6AZ&c=QUERY_RECORD&p=test_station_name&p=station_id&p=result&p=mac_addresss"}

Output:

{ "queryType": "scan", "dataSource": "xyz", "resultFormat": "list", "columns":["test_station_name","station_id","result","mac_address"], "intervals": [ "2018-01-01/2018-02-09" ],"filter": { "type": "selector", "dimension": "sn", "value":"GH6747246T4JLR6AZ" } }

Except for the content inside Columns and dimension and value attributes rest of the fields are hardcoded.

Please suggest.

Thanks,

Vish

1 ACCEPTED SOLUTION

avatar
Master Guru

AFAIK, Jolt doesn't have functions for doing transformations of a single value (such as split, extract, etc.). If the "params" field will always contain the same parameters in the same order (at least sn being first, c being second, and 4 p's) then you can extract "params" into an attribute using EvaluateJsonPath (with a JSONPath of $.params), then an update attribute with something like this:

83446-updateattribute-parse.png

If the parameters are arbitrary (such as number of p's, or can be in any order), you might be better off with ExecuteScript for your custom logic. Here's a Groovy script you can use in ExecuteScript to do what you describe above:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = new JsonSlurper().parseText(text)
def params = obj.params.tokenize('&')
        def builder = new groovy.json.JsonBuilder()
        builder.call {
            'queryType' 'scan'
    'dataSource' 'xyz'
    'resultFormat' ' list'
            'columns' params.findAll {p -> p.tokenize('=')[0] == 'p'}.collect {p -> p.tokenize('=')[1]}
    'intervals' ([] <<  '2018-01-01/2018-02-09')
    'filter' { 
'type' 'selector'
   'dimension' 'sn'
'value' params.find {p -> p.tokenize('=')[0] == 'sn'}.tokenize('=')[1]
    }
        }
        outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

View solution in original post

4 REPLIES 4

avatar
Master Guru

AFAIK, Jolt doesn't have functions for doing transformations of a single value (such as split, extract, etc.). If the "params" field will always contain the same parameters in the same order (at least sn being first, c being second, and 4 p's) then you can extract "params" into an attribute using EvaluateJsonPath (with a JSONPath of $.params), then an update attribute with something like this:

83446-updateattribute-parse.png

If the parameters are arbitrary (such as number of p's, or can be in any order), you might be better off with ExecuteScript for your custom logic. Here's a Groovy script you can use in ExecuteScript to do what you describe above:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = new JsonSlurper().parseText(text)
def params = obj.params.tokenize('&')
        def builder = new groovy.json.JsonBuilder()
        builder.call {
            'queryType' 'scan'
    'dataSource' 'xyz'
    'resultFormat' ' list'
            'columns' params.findAll {p -> p.tokenize('=')[0] == 'p'}.collect {p -> p.tokenize('=')[1]}
    'intervals' ([] <<  '2018-01-01/2018-02-09')
    'filter' { 
'type' 'selector'
   'dimension' 'sn'
'value' params.find {p -> p.tokenize('=')[0] == 'sn'}.tokenize('=')[1]
    }
        }
        outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

avatar
Explorer

Thanks @Matt Burgess for the help. I could use the script that you shared. Number of parameters are dynamic in nature. Now I could see the content in output as below:

{ "queryType": "scan", "dataSource": "nifi_kafka_druid_test", "resultFormat": " list", "columns": [ "test_station_name", "station_id", "result", "mac_addresss" ], "intervals": [ "2018-01-01/2018-02-09" ], "filter": { "type": "selector", "dimension": "sn", "value": "GH6747246T4JLR6AZ" } }

Now I am trying to pass this to invokehttp to my druid broker endpoint url. But I am not able to get the response back from invokehttp.

Here if I use curl command:

curl -X POST 'brokerip:port/druid/v2/?pretty' -H 'Content-Type:application/json' -d'{ "queryType": "scan", "dataSource": "nifi_kafka_druid_test", "resultFormat": "list", "columns":["station_id","mac_address","result","test_station_name"], "intervals": [ "2018-01-01/2018-02-09" ],"filter": { "type": "selector", "dimension": "sn", "value":"GGJ807405ZBJLR6AB" } }'

I get the response:

[ { "segmentId" : "nifi_kafka_druid_test_2018-02-07T00:00:00.000Z_2018-02-07T00:05:00.000Z_2018-07-17T13:27:30.419Z", "columns" : [ "station_id", "mac_address", "result", "test_station_name" ], "events" : [ { "station_id" : "LMMP_IQE-1FT-01_42_AE-1", "mac_address" : "ABC", "result" : "PASS", "test_station_name" : "EPI_DIE" } ]

I want to perform the same based on the flowfile content produced by executescript.

Am I missing some thing? how should I pass the data content to invokehttp processor?

Any help is appreciated.

Thanks,

Vish

avatar
Explorer

In addition to my earlier query, I could call druid endpoint to fetch the data based on the flow file content holding the json record.

flowfilecontent-->>replacetext-->updateattribute-->>Invokehttp-->putfile

in replace text I gave replacement value as below:

{"queryType":"scan","dataSource":"nifi_kafka_druid_test","resultFormat":"list","columns":["station_id","mac_address","result","test_station_name"],"intervals":["2018-01-01/2018-02-09"],"filter":{"type":"selector","dimension":"sn","value":"GGJ807405ZBJLR6AB"}}

and added the attribute mime.type=application/json in update attribute

I can call the invokehttp to druid end point to fetch the data by endpoint url:

[{"segmentId":"nifi_kafka_druid_test_2018-02-07T00:00:00.000Z_2018-02-07T00:05:00.000Z_2018-07-17T13:27:30.419Z","columns":["station_id","mac_address","result","test_station_name"],"events":[{"station_id":"LMMP_IQE-1FT-01_42_AE-1","mac_address":"ABC","result":"PASS","test_station_name":"EPI_DIE"}]}]

But when I try to pass flow file content of the groovy script to invokehttp I am not getting any response.

I also changed the groovy script to write the content of the json in single line instead of multiline pretty format.

Looking through the data provenance of the sample pipeline mentioned above to the actual pipeline with the execute script, I do not see any difference in the flowfile content passed to invokehttp. Not sure, what am I doing wrong.

Please suggest.

Thanks,

Vish

avatar
Explorer

sorry for my ignorance. I could resolve this issue by removing one additional space in " list" while wring the flowfile content generated by groovy script. Now I am able to fetch the data from invoke http by passing flowfile content from executescript.

Thanks for the support.

Regards,

Vish