Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Preparing nested JSON using SQL in NiFi

avatar
New Contributor

Hey there,

I'm trying to create a nested JSON using SQL in NiFi, but the output I'm getting has the nested part as a string. Here's the query I'm using:

SELECT
order_id,
JSON_ARRAYAGG(
JSON_OBJECT(
'order_Item_Seq_Id', order_Item_Seq_Id,
'product_Id', product_Id
)
) as order_item
FROM order_item
GROUP BY order_id;

The output looks like this:

[{
"order_id": "10000",
"order_item": "[{\"product_Id\": \"10007\", \"order_Item_Seq_Id\": \"00101\"}]"
}]

But I want it to be a valid nested JSON like this:

[
{
"order_id": "10000",
"order_item": [
{
"product_Id": "10007",
"order_Item_Seq_Id": "00101"
}
]
}
]

I'm hoping for a solution using Apache NiFi or Jolt.
Any help would be appreciated.

Thank you!

1 ACCEPTED SOLUTION

avatar
Super Guru

Hi @Sadhana21 ,

You can do this in two ways that I can think of :

1-  This might take few processors but basically after you get the output:

[{
"order_id": "10000",
"order_item": "[{\"product_Id\": \"10007\", \"order_Item_Seq_Id\": \"00101\"}]"
}]

      1.1  First EvaluateJsonPath to get the  order_id and set the destination to attribute:

           SAMSAL_0-1703791815125.png

      1.2  2ed EvaluateJsonPath to get the order_item and set the destination to flowfile and the Return Type as Json

       SAMSAL_1-1703791867077.png

        This will produce the following output:

[{"product_Id": "10007", "order_Item_Seq_Id": "00101"}]

        1.3 Finally you JoltTransformJson with the following spec:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "#${order_id}": "[&1].order_id",
        "@": "[&1].order_item[]"
      }
    }
  }
]

    This should give you the needed result

2- Using just UpdateRecord Processor:

I like this option because that is the only processor you need to use. The only caveat is that you need to define JsonRecordReader & JsonRecordSetWriter where in the later you have to define Avro schema for the expected output. In the UpdateRecord you can use the built-in function called "unescapeJson" to read json string and return it as json:

https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#unescapejson

UpdateRecord:

SAMSAL_2-1703792282831.png

  JsonRecordSetWriter:

SAMSAL_3-1703792337746.png

  Avro Scheam in the Schema Text:

{
	"name": "nifi",
	"type": "record",
	"fields": [
		{
			"name": "order_id",
			"type": "string"
		},
		{
			"name": "order_item",
			"type": {
				"name": "order_item_array",
				"type": "array",
				"items": {
					"name": "order_item",
					"type": "record",
					"fields": [
						{
							"name": "product_Id",
							"type": "string"
						},
						{
							"name": "order_Item_Seq_Id",
							"type": "string"
						}
					]
				}
			}
		}
	]
}

If that helps please accept solution

Thanks

 

View solution in original post

4 REPLIES 4

avatar
Community Manager

@Sadhana21 Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @joseomjr @cotopaul  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Super Guru

Hi @Sadhana21 ,

You can do this in two ways that I can think of :

1-  This might take few processors but basically after you get the output:

[{
"order_id": "10000",
"order_item": "[{\"product_Id\": \"10007\", \"order_Item_Seq_Id\": \"00101\"}]"
}]

      1.1  First EvaluateJsonPath to get the  order_id and set the destination to attribute:

           SAMSAL_0-1703791815125.png

      1.2  2ed EvaluateJsonPath to get the order_item and set the destination to flowfile and the Return Type as Json

       SAMSAL_1-1703791867077.png

        This will produce the following output:

[{"product_Id": "10007", "order_Item_Seq_Id": "00101"}]

        1.3 Finally you JoltTransformJson with the following spec:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "#${order_id}": "[&1].order_id",
        "@": "[&1].order_item[]"
      }
    }
  }
]

    This should give you the needed result

2- Using just UpdateRecord Processor:

I like this option because that is the only processor you need to use. The only caveat is that you need to define JsonRecordReader & JsonRecordSetWriter where in the later you have to define Avro schema for the expected output. In the UpdateRecord you can use the built-in function called "unescapeJson" to read json string and return it as json:

https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#unescapejson

UpdateRecord:

SAMSAL_2-1703792282831.png

  JsonRecordSetWriter:

SAMSAL_3-1703792337746.png

  Avro Scheam in the Schema Text:

{
	"name": "nifi",
	"type": "record",
	"fields": [
		{
			"name": "order_id",
			"type": "string"
		},
		{
			"name": "order_item",
			"type": {
				"name": "order_item_array",
				"type": "array",
				"items": {
					"name": "order_item",
					"type": "record",
					"fields": [
						{
							"name": "product_Id",
							"type": "string"
						},
						{
							"name": "order_Item_Seq_Id",
							"type": "string"
						}
					]
				}
			}
		}
	]
}

If that helps please accept solution

Thanks

 

avatar
Super Collaborator

...a 3rd option because I like scripted processors 😂...using ExcecuteGroovyScript

 

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()

FlowFile flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, { inputStream, outputStream -> 
                                        List<Map> data = jsonSlurper.parse(inputStream)
                                        data.each { 
                                            it.order_item = jsonSlurper.parseText(it.order_item)
                                        }
                                        outputStream.write(jsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
                                  } as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)

 

Looks like a lot but this is what takes the string JSON and converts it to JSON:

it.order_item = jsonSlurper.parseText(it.order_item)

avatar
New Contributor

Your input means a lot, and I appreciate your assistance @DianaTorres@SAMSAL and @joseomjr.

Thank you all for sharing your knowledge and insights!