Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Filtering Json record with QueryRecord processor returns zero kb flowfile

Rising Star
Hello all,
I am trying to use QueryRecord processor to filter a json file but i am getting 0kb flowfile on my filter relationship.
My input flowfile is:
{
"data": [
{
"room": "A",
"path": "/old/can"
},
{
"room": "B",
"path": "/old/can"
},
{
"room": "C",
"path": "/old/can"
}
]
}
 
My filter query:
SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room[*]')='${'ip'}'
NOTE: ip is an attribute using to compare which class to filter, so if ip = A my output after filtering should be
{
"data": [
{
"room": "A",
"path": "/old/can"
}
            ]
}
Thank you all.
2 ACCEPTED SOLUTIONS

Super Collaborator

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

View solution in original post

Super Collaborator

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.

 

View solution in original post

10 REPLIES 10

Super Collaborator

@rafy I would try this:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}'

 

Assuming ip is an attribute (${ip}) ofcourse.

Rising Star

16:25:08 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=de224186-58a6-4d88-8d34-de5170bd74ed] due to org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException: Invalid Expression: SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}' due to Unexpected token '}' at line 1, column 4. Query: ${ip}: {}

 

Thank you, i received the above error message.

Rising Star

But after enclosing "ip" in a single quote like this '${'ip'}', i got another error message below:

16:29:55 WAT
ERROR
 
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=4ac8ca57-9b9b-481a-8371-1d9f1ede2ab0] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

Thank you.

Super Collaborator

Can you please show the source json in a :

 

code box

 

 

Also, try a manual test, without attribute, such as:

 

 

SELECT *
FROM FLOWFILE
WHERE RPATH(data, '/room') = 'A'

 

This will ensure the query is correct to the json payload.   Once that works, start testing adding the attribute.

Rising Star

This is the source json:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		},
		{
			"room": "B",
			"path": "/old/can"
		},
		{
			"room": "C",
			"path": "/old/can"
		}
	]
}

Rising Star

manual testing using:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='A'

 

I received an error message:

16:47:59 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=596ac8a5-8431-4b42-b7e5-84fe972fe09b] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

Super Collaborator

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

Rising Star

Thank so much. It works. The only issue i have reconstructing back to this format using updaterecord processor:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		}
	]
}

Super Collaborator

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.

 

Rising Star

Thank you so much too. This also works. But i am actually a learner using Jolt.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.