Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

NiFi - ExecuteScript for getting max value of a Json array

Solved Go to solution
Highlighted

NiFi - ExecuteScript for getting max value of a Json array

Rising Star

Hi, as far I have investigated it is not possible in EvaluateJsonPath to get the maximum value of an element of an array. Searched for something like shown in the picture.

Now I'm searching for a solution with ExecuteScript like mentioned here:

https://community.hortonworks.com/questions/57755/apache-nifi-how-to-calculate-sum-or-average-of-val...

Because Im not familiar with the script languages are offered I'm looking for an example to do this.

Found this: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

But I don't find how to get the max value.

Any help is very appreciated, thanks!

93258-hwc-jsonmaxvalue.png

1 ACCEPTED SOLUTION

Accepted Solutions

Re: NiFi - ExecuteScript for getting max value of a Json array

You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "ID","type": "int"},
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Then you can add a user-defined property (let's call it "max") to QueryRecord with the value

SELECT MAX(geaendertAm_ABAS) from FLOWFILE

Your JSONRecordSetWriter will need a schema with just the field:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor.

As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input:

def flowFile = session.get()
if(!flowFile) return
try {
   flowFile = session.write(flowFile, {inputStream, outputStream ->
      def objList = new groovy.json.JsonSlurper().parse(inputStream)
      def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
      def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}"
      outputStream.write(maxOutput.bytes)
   } as StreamCallback)
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}

If you instead want the max in an attribute, you can use something like:

def flowFile = session.get()
if(!flowFile) return
try {
   def inputStream = session.read(flowFile)
   def objList = new groovy.json.JsonSlurper().parse(inputStream)
   def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
   inputStream.close()
   flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}
11 REPLIES 11

Re: NiFi - ExecuteScript for getting max value of a Json array

You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "ID","type": "int"},
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Then you can add a user-defined property (let's call it "max") to QueryRecord with the value

SELECT MAX(geaendertAm_ABAS) from FLOWFILE

Your JSONRecordSetWriter will need a schema with just the field:

{
 "namespace": "nifi",
 "name": "ABAS",
 "type": "record",
 "fields": [
  {"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
 ]
}

Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor.

As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input:

def flowFile = session.get()
if(!flowFile) return
try {
   flowFile = session.write(flowFile, {inputStream, outputStream ->
      def objList = new groovy.json.JsonSlurper().parse(inputStream)
      def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
      def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}"
      outputStream.write(maxOutput.bytes)
   } as StreamCallback)
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}

If you instead want the max in an attribute, you can use something like:

def flowFile = session.get()
if(!flowFile) return
try {
   def inputStream = session.read(flowFile)
   def objList = new groovy.json.JsonSlurper().parse(inputStream)
   def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
   inputStream.close()
   flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}

Re: NiFi - ExecuteScript for getting max value of a Json array

Rising Star

Once again I have to get max-value of a date. This time an attribute holds the JSON to be checked not the FF-content.

This is the content of attribute RESPONSE:

[{"id":"(1208)","datbis":"20190630"},{"id":"(1210)","datbis":"20191231"}]

With the script of above discussed problem and this information https://gist.github.com/mattyb149/478864017ec70d76f74f

(thanks to @Matt Burgess ) it was possible to adjust the script for doing the check in the ff-attribute.

Just in case some else has to solve this too here the script:

def flowFile = session.get()
if(!flowFile) return
try {
   def objList = new groovy.json.JsonSlurper().parseText(flowFile.getAttribute('RESPONSE'))
   def max = objList.max {Date.parse("yyyyMMdd",it.datbis)}
   flowFile = session.putAttribute(flowFile, 'MAX_datbis', max.datbis.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}


Re: NiFi - ExecuteScript for getting max value of a Json array

Rising Star

@Matt Burgess

Thanks for your quick response! Sorry, but I'm afraid I'm unable to cope with your answer. Knowing NiFi only for a few weeks, knowing nothing about the configuration of the Controller Services for JsonPathReader/JsonRecordSetWriter and need to solve the described problem.

I was hoping on some script solution like...

flowFile = session.get()
if(!flowFile) return 
flowFile = session.putAttribute(flowFile, 'Value_Groovy', (FF_content.geaendertAm_ABAS.max())) 
session.transfer(flowFile, REL_SUCCESS)

... where FF_content is an attribute which contains the json.

No solution for "beginners" possible?

Re: NiFi - ExecuteScript for getting max value of a Json array

I updated my answer with a scripting alternative. If you find it useful, please take the time to "Accept" the answer, thanks!

Re: NiFi - ExecuteScript for getting max value of a Json array

Rising Star

@Matt Burgess Great, my boss and I are very grateful for your "very specific" help!

One last question concerning the script: The perfect solution would leave the FF-content unchanged and deliver the maxOutput as FF-Attribute.

Tried this but can't find the right syntax:

flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', ${max.geandertAm_ABAS})

Is this possible?

Re: NiFi - ExecuteScript for getting max value of a Json array

Yep, just updated the answer with another script that puts the max in an attribute. Cheers!

Re: NiFi - ExecuteScript for getting max value of a Json array

Rising Star

@Matt Burgess Superb, it works perfect! One day I will try your original solution, sounds interesting. Bye!

Re: NiFi - ExecuteScript for getting max value of a Json array

Rising Star

Hi @Matt Burgess

following your above solution for returning the max-value in an attribute my script looks like this:


def flowFile = session.get()
if(!flowFile) return
try {
   def inputStream = session.read(flowFile)
   def objList = new groovy.json.JsonSlurper().parse(inputStream)
   def max = objList.max {Date.parse("yyyyMMddHHmmss",it.elem_stand)}
   inputStream.close()
   flowFile = session.putAttribute(flowFile, 'MAX_elem_stand', max.elem_stand.toString())
   session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error("Error while determining max", e)
   session.transfer(flowFile, REL_FAILURE)
}


It worked fine with Json-format:

            [{
                "zn": 1,
                "elem_stand": "20190611140623",
                "elem_id": "1086"
            },
           {
                "zn": 2,
                "elem_stand": "20190624170807",
                "elem_id": "1089"
            } ]  

But now the input format is changed to:


{
        "name": "Belegart",
        "id": "269",
        "table": [
            {
                "zn": 1,
                "elem_stand": "20190611140623",
                "elem_id": "1086"
            },
           {
                "zn": 2,
                "elem_stand": "20190624170807",
                "elem_id": "1089"
            }
        ]
    }

And I can't find the right syntax to access the elem_stand within the array for checking max-value.

Please could you tell me how this is possible? Thanks!

Re: NiFi - ExecuteScript for getting max value of a Json array

In the script, you're creating a variable `objList` that (for the first input format) points at the top-level array of objects, so you can call max() directly on that array (I think technically it's a List under the hood). In the second input format, objList will be pointing to the top-level object, so you'll need to get the array member `table` out of the object. Update the "def max" line to this:

def max = objList.table.max {Date.parse("yyyyMMddHHmmss",it.elem_stand)}