Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Filtering Json record with QueryRecord processor returns zero kb flowfile

avatar
Rising Star
Hello all,
I am trying to use QueryRecord processor to filter a json file but i am getting 0kb flowfile on my filter relationship.
My input flowfile is:
{
"data": [
{
"room": "A",
"path": "/old/can"
},
{
"room": "B",
"path": "/old/can"
},
{
"room": "C",
"path": "/old/can"
}
]
}
 
My filter query:
SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room[*]')='${'ip'}'
NOTE: ip is an attribute using to compare which class to filter, so if ip = A my output after filtering should be
{
"data": [
{
"room": "A",
"path": "/old/can"
}
            ]
}
Thank you all.
2 ACCEPTED SOLUTIONS

avatar
Super Collaborator

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

View solution in original post

avatar
Master Collaborator

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.

 

View solution in original post

10 REPLIES 10

avatar
Super Collaborator

@rafy I would try this:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}'

 

Assuming ip is an attribute (${ip}) ofcourse.

avatar
Rising Star

16:25:08 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=de224186-58a6-4d88-8d34-de5170bd74ed] due to org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException: Invalid Expression: SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}' due to Unexpected token '}' at line 1, column 4. Query: ${ip}: {}

 

Thank you, i received the above error message.

avatar
Rising Star

But after enclosing "ip" in a single quote like this '${'ip'}', i got another error message below:

16:29:55 WAT
ERROR
 
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=4ac8ca57-9b9b-481a-8371-1d9f1ede2ab0] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

Thank you.

avatar
Super Collaborator

Can you please show the source json in a :

 

code box

 

 

Also, try a manual test, without attribute, such as:

 

 

SELECT *
FROM FLOWFILE
WHERE RPATH(data, '/room') = 'A'

 

This will ensure the query is correct to the json payload.   Once that works, start testing adding the attribute.

avatar
Rising Star

This is the source json:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		},
		{
			"room": "B",
			"path": "/old/can"
		},
		{
			"room": "C",
			"path": "/old/can"
		}
	]
}

avatar
Rising Star

manual testing using:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='A'

 

I received an error message:

16:47:59 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=596ac8a5-8431-4b42-b7e5-84fe972fe09b] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

avatar
Super Collaborator

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

avatar
Rising Star

Thank so much. It works. The only issue i have reconstructing back to this format using updaterecord processor:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		}
	]
}

avatar
Master Collaborator

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.

 

avatar
Rising Star

Thank you so much too. This also works. But i am actually a learner using Jolt.

Labels