How to capture both key and value of json data with dynamic key on Nifi?


I have json data listen from JettyWebsocketServer like:

	"project_1": {
		"host": "V001",
		"data": [{
			"sensor_code": "temp",
			"value": {
				"max": 26.2,
				"avg": 26.1,
				"min": 26.1
			"sensor_code": "humid",
			"value": {
				"max": 48.8,
				"avg": 48.7,
				"min": 48.6

Because my data from multiple project, so I have dynamic key as: project_1, project_2. And I have to route those data into corresponding destination.

I found EvaluateJsonPath to parse Json data but it need static key.

There is any way to capture both key and value of it to route data by key?


@Kiem Nguyen

you don't need to do all this. I think it's a bad idea to change your JSON data just to be able to parse it. You can have surprise when your application evolve.

So as a summary:

  • Use my solution above to extract your key
  • Use the below solution to extract the content
  • With Key and Value in hand, use put MongoDB to store your data in the right collection

To extract your JSON content, use evaluatejsonpath with the following configuration. Add a dynamic key my_content and use this JSON path $.*


You get all the data that you are looking for:


Can you try this an let me know if it works ?


@Abdelkrim Hadjidj

Today I have a solution for just my case:
Because the key of our data is unique string on json data. So here my step to capture both key and value of json data:
Step 1: Using ExtractText to obtain key (project_1, project_2,...)
Step 2: Using ReplaceText to replace key on json data by a static key (project_1, project_2,... string will be replaced by content string.

Step 3: Using EvaluateJsonPath to obtain data of content above and transform it to flowfile-content for next processor.

Step 4: Using PutMongoDB to insert data into corresponding database using Expression Regex {projectname} with data above.

As alternatively way, we also can use ExecuteScript to convert key and value of json data to attribute then insert into MongoDB. But as I running, the throughput when use the first way is higher then using ExecuteScript. Here my code when using ExecuteScript:

import json
from import IOUtils
from java.nio.charset import StandardCharsets
from import StreamCallback

class StreamCallback(StreamCallback):
    def __init__(self):
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text)
        for key in data:
            first = key
        content = data[first]
        contentString = json.dumps(content)
        viewFlowFile = session.create()
        viewFlowFile = session.putAllAttributes(viewFlowFile,
                {'project': str(first), 'content': contentString})
        session.transfer(viewFlowFile, REL_SUCCESS)
flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, StreamCallback())
    session.transfer(flowFile, REL_FAILURE)

Thanks @Abdelkrim Hadjidj,

It works well. It is interesting when just use $.* to capture content.

I understood more about JsonPath Expression.

Thank you again, :d