- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
NiFi - ExecuteScript for getting max value of a Json array
- Labels:
-
Apache NiFi
Created on ‎11-14-2018 01:43 PM - edited ‎08-17-2019 04:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi, as far I have investigated it is not possible in EvaluateJsonPath to get the maximum value of an element of an array. Searched for something like shown in the picture.
Now I'm searching for a solution with ExecuteScript like mentioned here:
Because Im not familiar with the script languages are offered I'm looking for an example to do this.
Found this: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html
But I don't find how to get the max value.
Any help is very appreciated, thanks!
Created ‎11-14-2018 02:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as:
{ "namespace": "nifi", "name": "ABAS", "type": "record", "fields": [ {"name": "ID","type": "int"}, {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}} ] }
Then you can add a user-defined property (let's call it "max") to QueryRecord with the value
SELECT MAX(geaendertAm_ABAS) from FLOWFILE
Your JSONRecordSetWriter will need a schema with just the field:
{ "namespace": "nifi", "name": "ABAS", "type": "record", "fields": [ {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}} ] }
Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor.
As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input:
def flowFile = session.get() if(!flowFile) return try { flowFile = session.write(flowFile, {inputStream, outputStream -> def objList = new groovy.json.JsonSlurper().parse(inputStream) def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)} def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}" outputStream.write(maxOutput.bytes) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Error while determining max", e) session.transfer(flowFile, REL_FAILURE) }
If you instead want the max in an attribute, you can use something like:
def flowFile = session.get() if(!flowFile) return try { def inputStream = session.read(flowFile) def objList = new groovy.json.JsonSlurper().parse(inputStream) def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)} inputStream.close() flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString()) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Error while determining max", e) session.transfer(flowFile, REL_FAILURE) }
Created ‎01-27-2020 07:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In NiFi 1.10 we updated Groovy to 2.5.0 (NIFI-5254), which itself moved the date utils out to a module which is not included with groovy-all by default. Due to an oversight, the new module(s) were not included with the Groovy components, causing your script to break. I have written up NIFI-7069 to track the inclusion of this module going forward. In the meantime the blog post I linked to above explains how to use the Java 8 (not GDK) date/time classes instead, not just as a workaround but as an improvement.
Created ‎11-14-2018 02:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as:
{ "namespace": "nifi", "name": "ABAS", "type": "record", "fields": [ {"name": "ID","type": "int"}, {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}} ] }
Then you can add a user-defined property (let's call it "max") to QueryRecord with the value
SELECT MAX(geaendertAm_ABAS) from FLOWFILE
Your JSONRecordSetWriter will need a schema with just the field:
{ "namespace": "nifi", "name": "ABAS", "type": "record", "fields": [ {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}} ] }
Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor.
As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input:
def flowFile = session.get() if(!flowFile) return try { flowFile = session.write(flowFile, {inputStream, outputStream -> def objList = new groovy.json.JsonSlurper().parse(inputStream) def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)} def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}" outputStream.write(maxOutput.bytes) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Error while determining max", e) session.transfer(flowFile, REL_FAILURE) }
If you instead want the max in an attribute, you can use something like:
def flowFile = session.get() if(!flowFile) return try { def inputStream = session.read(flowFile) def objList = new groovy.json.JsonSlurper().parse(inputStream) def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)} inputStream.close() flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString()) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Error while determining max", e) session.transfer(flowFile, REL_FAILURE) }
Created ‎08-01-2019 08:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Once again I have to get max-value of a date. This time an attribute holds the JSON to be checked not the FF-content.
This is the content of attribute RESPONSE:
[{"id":"(1208)","datbis":"20190630"},{"id":"(1210)","datbis":"20191231"}]
With the script of above discussed problem and this information https://gist.github.com/mattyb149/478864017ec70d76f74f
(thanks to @Matt Burgess ) it was possible to adjust the script for doing the check in the ff-attribute.
Just in case some else has to solve this too here the script:
def flowFile = session.get() if(!flowFile) return try { def objList = new groovy.json.JsonSlurper().parseText(flowFile.getAttribute('RESPONSE')) def max = objList.max {Date.parse("yyyyMMdd",it.datbis)} flowFile = session.putAttribute(flowFile, 'MAX_datbis', max.datbis.toString()) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error("Error while determining max", e) session.transfer(flowFile, REL_FAILURE) }
Created ‎01-27-2020 06:29 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@mburgess Hi Matt, your help is needed urgently.
The script that reads a JSON structure from an attribute and determines the maximum value of the date worked fine.
I haven't worked on the flow for a while and now it doesn't work anymore.
In the meantime there was only an update from 1.9.2 to 1.10.0. Otherwise everything is as before.
Since I still have no idea about Groovy, I can't fix the error. Please, could you help me again? Thanks.
This is content of attribute RESPONSE
ERROR
ExecuteScript[id=e70ffcf5-016f-1000-0000-000063865879]
Error while determining max: groovy.lang.MissingMethodException: No signature of method: static java.util.Date.parse() is applicable for argument types: (String, String) values: [yyyyMMdd, 20190630]
Possible solutions: parse(java.lang.String), wait(), clone(), any(), grep(), use(java.lang.Class, groovy.lang.Closure)​
Created ‎01-27-2020 07:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In NiFi 1.10 we updated Groovy to 2.5.0 (NIFI-5254), which itself moved the date utils out to a module which is not included with groovy-all by default. Due to an oversight, the new module(s) were not included with the Groovy components, causing your script to break. I have written up NIFI-7069 to track the inclusion of this module going forward. In the meantime the blog post I linked to above explains how to use the Java 8 (not GDK) date/time classes instead, not just as a workaround but as an improvement.
Created ‎01-28-2020 02:49 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Matt, thank you so much for your quick response and help.
It took me some time to figure out what you meant... but it works like a charm!
If anyone can use the solution, here it is:
//======================================================================================================
// TEST java LocalDate.parse with groovy max-function
// FF-Attribute RESPONSE contains [{"id":"(1208)", "datbis":"20180219" }, { "id":"(1210)", "datbis":"20191231" }, { "id":"(1212)", "datbis":"20200128" }]
// FF-Atribute MAX_datbis returns 20200128
//======================================================================================================
import java.time.LocalDate
def flowFile = session.get()
if(!flowFile) return
try {
def objList = new groovy.json.JsonSlurper().parseText(flowFile.getAttribute('RESPONSE'))
def max = objList.max {LocalDate.parse(it.datbis,"yyyyMMdd")}
flowFile = session.putAttribute(flowFile, 'MAX_datbis', max.datbis.toString())
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Error while determining max", e)
session.transfer(flowFile, REL_FAILURE)
}
Created ‎11-14-2018 03:09 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks for your quick response! Sorry, but I'm afraid I'm unable to cope with your answer. Knowing NiFi only for a few weeks, knowing nothing about the configuration of the Controller Services for JsonPathReader/JsonRecordSetWriter and need to solve the described problem.
I was hoping on some script solution like...
flowFile = session.get() if(!flowFile) return flowFile = session.putAttribute(flowFile, 'Value_Groovy', (FF_content.geaendertAm_ABAS.max())) session.transfer(flowFile, REL_SUCCESS)
... where FF_content is an attribute which contains the json.
No solution for "beginners" possible?
Created ‎11-14-2018 03:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I updated my answer with a scripting alternative. If you find it useful, please take the time to "Accept" the answer, thanks!
Created ‎11-15-2018 07:05 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Matt Burgess Great, my boss and I are very grateful for your "very specific" help!
One last question concerning the script: The perfect solution would leave the FF-content unchanged and deliver the maxOutput as FF-Attribute.
Tried this but can't find the right syntax:
flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', ${max.geandertAm_ABAS})
Is this possible?
Created ‎11-15-2018 03:44 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Yep, just updated the answer with another script that puts the max in an attribute. Cheers!
