Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi - ExecuteScript for getting max value of a Json array

avatar
Master Collaborator

Hi, as far I have investigated it is not possible in EvaluateJsonPath to get the maximum value of an element of an array. Searched for something like shown in the picture.

Now I'm searching for a solution with ExecuteScript like mentioned here:

https://community.hortonworks.com/questions/57755/apache-nifi-how-to-calculate-sum-or-average-of-val...

Because Im not familiar with the script languages are offered I'm looking for an example to do this.

Found this: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

But I don't find how to get the max value.

Any help is very appreciated, thanks!

93258-hwc-jsonmaxvalue.png

2 ACCEPTED SOLUTIONS

avatar
Master Guru

You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "ID","type": "int"},
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Then you can add a user-defined property (let's call it "max") to QueryRecord with the value

SELECT MAX(geaendertAm_ABAS) from FLOWFILE

Your JSONRecordSetWriter will need a schema with just the field:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor.

As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input:

def flowFile = session.get()
if(!flowFile) return
try {
   flowFile = session.write(flowFile, {inputStream, outputStream ->
      def objList = new groovy.json.JsonSlurper().parse(inputStream)
      def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
      def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}"
      outputStream.write(maxOutput.bytes)
   } as StreamCallback)
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}

If you instead want the max in an attribute, you can use something like:

def flowFile = session.get()
if(!flowFile) return
try {
   def inputStream = session.read(flowFile)
   def objList = new groovy.json.JsonSlurper().parse(inputStream)
   def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
   inputStream.close()
   flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}

View solution in original post

avatar
Master Guru

In NiFi 1.10 we updated Groovy to 2.5.0 (NIFI-5254), which itself moved the date utils out to a module which is not included with groovy-all by default. Due to an oversight, the new module(s) were not included with the Groovy components, causing your script to break. I have written up NIFI-7069 to track the inclusion of this module going forward.  In the meantime the blog post I linked to above explains how to use the Java 8 (not GDK) date/time classes instead, not just as a workaround but as an improvement. 

View solution in original post

14 REPLIES 14

avatar
Master Collaborator

@Matt Burgess Superb, it works perfect! One day I will try your original solution, sounds interesting. Bye!

avatar
Master Collaborator

Hi @Matt Burgess

following your above solution for returning the max-value in an attribute my script looks like this:


def flowFile = session.get()
if(!flowFile) return
try {
   def inputStream = session.read(flowFile)
   def objList = new groovy.json.JsonSlurper().parse(inputStream)
   def max = objList.max {Date.parse("yyyyMMddHHmmss",it.elem_stand)}
   inputStream.close()
   flowFile = session.putAttribute(flowFile, 'MAX_elem_stand', max.elem_stand.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}


It worked fine with Json-format:

            [{
                "zn": 1,
                "elem_stand": "20190611140623",
                "elem_id": "1086"
            },
           {
                "zn": 2,
                "elem_stand": "20190624170807",
                "elem_id": "1089"
            } ]  

But now the input format is changed to:


{
        "name": "Belegart",
        "id": "269",
        "table": [
            {
                "zn": 1,
                "elem_stand": "20190611140623",
                "elem_id": "1086"
            },
           {
                "zn": 2,
                "elem_stand": "20190624170807",
                "elem_id": "1089"
            }
        ]
    }

And I can't find the right syntax to access the elem_stand within the array for checking max-value.

Please could you tell me how this is possible? Thanks!

avatar
Master Guru

In the script, you're creating a variable `objList` that (for the first input format) points at the top-level array of objects, so you can call max() directly on that array (I think technically it's a List under the hood). In the second input format, objList will be pointing to the top-level object, so you'll need to get the array member `table` out of the object. Update the "def max" line to this:

def max = objList.table.max {Date.parse("yyyyMMddHHmmss",it.elem_stand)}

avatar
Master Collaborator

@Matt Burgess It works! Thank you so much not only for the solution but also for the explanation!

avatar
Explorer

Thats nice hint on calculating an aggregation like MAX on a single FlowFile.

I wonder if QueryRecord can also performe aggregation on multiple FlowFiles and output just 1 FlowFile with answer ? Imagine ListS3 which can output multiple FlowFiles (1 per new file created since last time) and we would like to know which of these new n inbound which one is the most recent either using s3.modified or even filename itself shall its timestamp format yyyymmdd HH:mm:ss.SSS ?

I wonder how to get the most recent file ?