Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Filtering Json record with QueryRecord processor returns zero kb flowfile

avatar
Contributor
Hello all,
I am trying to use QueryRecord processor to filter a json file but i am getting 0kb flowfile on my filter relationship.
My input flowfile is:
{
"data": [
{
"room": "A",
"path": "/old/can"
},
{
"room": "B",
"path": "/old/can"
},
{
"room": "C",
"path": "/old/can"
}
]
}
 
My filter query:
SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room[*]')='${'ip'}'
NOTE: ip is an attribute using to compare which class to filter, so if ip = A my output after filtering should be
{
"data": [
{
"room": "A",
"path": "/old/can"
}
            ]
}
Thank you all.
2 ACCEPTED SOLUTIONS

avatar

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

View solution in original post

avatar

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.

 

View solution in original post

10 REPLIES 10

avatar

@rafy I would try this:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}'

 

Assuming ip is an attribute (${ip}) ofcourse.

avatar
Contributor

16:25:08 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=de224186-58a6-4d88-8d34-de5170bd74ed] due to org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException: Invalid Expression: SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='${ip}' due to Unexpected token '}' at line 1, column 4. Query: ${ip}: {}

 

Thank you, i received the above error message.

avatar
Contributor

But after enclosing "ip" in a single quote like this '${'ip'}', i got another error message below:

16:29:55 WAT
ERROR
 
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=4ac8ca57-9b9b-481a-8371-1d9f1ede2ab0] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

Thank you.

avatar

Can you please show the source json in a :

 

code box

 

 

Also, try a manual test, without attribute, such as:

 

 

SELECT *
FROM FLOWFILE
WHERE RPATH(data, '/room') = 'A'

 

This will ensure the query is correct to the json payload.   Once that works, start testing adding the attribute.

avatar
Contributor

This is the source json:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		},
		{
			"room": "B",
			"path": "/old/can"
		},
		{
			"room": "C",
			"path": "/old/can"
		}
	]
}

avatar
Contributor

manual testing using:

 

SELECT * FROM FLOWFILE WHERE RPATH_STRING(data, '/room')='A'

 

I received an error message:

16:47:59 WATERROR01881003-cbda-1ae7-fdab-6af8640a064e
QueryRecord[id=01881003-cbda-1ae7-fdab-6af8640a064e] Unable to query FlowFile[filename=596ac8a5-8431-4b42-b7e5-84fe972fe09b] due to java.lang.RuntimeException: RecordPath /room resulted in more than one return value. The RecordPath must be further constrained.: {}

avatar

@rafy   I got the same issues when I tried to create a flow using RPATH.   However, here is a solution i found to dial into the data array and match on the room = A:

 

SELECT *
FROM FLOWFILE WHERE room = 'A'

I used QueryRecord With JSONTreeReader (see below) and JSONRecordSetWriter (default).

NiFI Flow Definition here:  @gitHub

 

Screenshots:

 

 

Screen Shot 2023-05-12 at 12.25.36 PM.png

 

Screen Shot 2023-05-12 at 12.26.27 PM.png

 

Screen Shot 2023-05-12 at 12.26.09 PM.png

 

 

avatar
Contributor

Thank so much. It works. The only issue i have reconstructing back to this format using updaterecord processor:

{
	"data": [
		{
			"room": "A",
			"path": "/old/can"
		}
	]
}

avatar

Hi @rafy ,

I dont think the QueryRecord is suppose to work this way but I could be wrong. The query record basically filters from the root array and not the nested array. Since your input is not an array json object on the root this is not going to work. and if the filter " RPATH_STRING(data, '/room')='A'" is suppose to work (not sure why its not) it will return the entire record from the root and not just the subset. I think the question has been asked before but there was no answer:

https://community.cloudera.com/t5/Support-Questions/Select-a-subset-of-data-using-NiFi-QueryRecord/t...

Now to resolve your problem, you have two options of processors :

Option 1:  EvaluateJsonPath->QueryRecord->JsonJoltTransformation where processors are configured as follows:

EvaluateJsonPath : to get the data array into root array

SAMSAL_0-1683908023764.png

 

QueryRecord : To Query the required record based on the ${ip} attribute:

SAMSAL_1-1683908143241.png

 

JsonJoltTransformation: To convert back to the required schema with data array

SAMSAL_2-1683908662386.png

spec:

[
  {
    "operation": "shift",
    "spec": {
      "*":"data[#].&"
    }
  }
]

 

Option 2: Just one JoltTransformationJson with the following spec:

 

[
  {
    "operation": "shift",
    "spec": {
      "data": {
        "*": {
          "room": {
            "${ipAttr}": {
              "@2": "data[0]"
            }
          }
        }
      }
    }
  }

]

Note: I had to change the ip attribute name to ipAttr since ip is reserved Expression Language function.