Support Questions

Find answers, ask questions, and share your expertise

NiFi AttributeRollingWindow 1.5.0 - start_curr_batch_ts as attribute?

avatar
New Contributor

Hi all,

I'm experimenting with the AttributeRollingWindow processor for a solution to monitor the number of messages hitting different Kafka topics. Is there any way of getting the start_curr_batch_ts values (available in the View State menu) in to a flow file's attributes along with the usual rolling_window_count, ..._mean and _value?

Thanks,

Chris.

1 ACCEPTED SOLUTION

avatar
Master Guru

@Chris Grren

We cannot add Stored State values to the flow files attributes, How ever you can create custom processor which can implement the logic of getting current batch timestamp attribute added to each flowfile.
You can also use RestApi call to the AttributeRollingWindow processor to know what is the current batch timestamp value.

We can use Get method with make a call to the below path to get the current state of the processor

GET /processors/{id}/state //Gets the state for a processor 

For more details about Rest Api calls refer to this link.

Example:-

As i'm having AttributeRollingWindow processor with processor id as d372cfe5-0162-1000-398e-62f2d3652166 and from command line we need to make rest api call as below.

Making Rest Api call from Command Line:-

$ curl -i -X GET -H 'Content-Type:application/json' http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state
Response:-
as you can view below these are the states saved in the processor and the first key is our current batch timestamp i.e 1524017336077
{
  "componentState" : {
    "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
    "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
    "localState" : {
      "scope" : "LOCAL",
      "totalEntryCount" : 7,
      "state" : [ {
        "key" : "1524017336077", //current batch timestamp
        "value" : "1.0"
      }, {
        "key" : "1524017336079",
        "value" : "1.0"
      }, {
        "key" : "1524017336082",
        "value" : "1.0"
      }, {
        "key" : "1524017336083",
        "value" : "1.0"
      }, {
        "key" : "1524017336085",
        "value" : "1.0"
      }, {
        "key" : "1524017336086",
        "value" : "1.0"
      }, {
        "key" : "count",
        "value" : "7"
      } ]
    }
  }
}

States stored in the processor:-

68512-states.png

Making same Rest Api call from NiFi processor:-
use InvokeHttp processor with Remote URL as

http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state

68514-invokehttp.png

Output:-
From the response relationship you will get same response as above

{
  "componentState" : {
    "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
    "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
    "localState" : {
      "scope" : "LOCAL",
      "totalEntryCount" : 7,
      "state" : [ {
        "key" : "1524017336077", //current batch timestamp
        "value" : "1.0"
      }, {
        "key" : "1524017336079",
        "value" : "1.0"
      }, {
        "key" : "1524017336082",
        "value" : "1.0"
      }, {
        "key" : "1524017336083",
        "value" : "1.0"
      }, {
        "key" : "1524017336085",
        "value" : "1.0"
      }, {
        "key" : "1524017336086",
        "value" : "1.0"
      }, {
        "key" : "count",
        "value" : "7"
      } ]
    }
  }
}

As the first state array value will be your current batch timestamp.

.

If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.


flow.png

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Chris Grren

We cannot add Stored State values to the flow files attributes, How ever you can create custom processor which can implement the logic of getting current batch timestamp attribute added to each flowfile.
You can also use RestApi call to the AttributeRollingWindow processor to know what is the current batch timestamp value.

We can use Get method with make a call to the below path to get the current state of the processor

GET /processors/{id}/state //Gets the state for a processor 

For more details about Rest Api calls refer to this link.

Example:-

As i'm having AttributeRollingWindow processor with processor id as d372cfe5-0162-1000-398e-62f2d3652166 and from command line we need to make rest api call as below.

Making Rest Api call from Command Line:-

$ curl -i -X GET -H 'Content-Type:application/json' http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state
Response:-
as you can view below these are the states saved in the processor and the first key is our current batch timestamp i.e 1524017336077
{
  "componentState" : {
    "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
    "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
    "localState" : {
      "scope" : "LOCAL",
      "totalEntryCount" : 7,
      "state" : [ {
        "key" : "1524017336077", //current batch timestamp
        "value" : "1.0"
      }, {
        "key" : "1524017336079",
        "value" : "1.0"
      }, {
        "key" : "1524017336082",
        "value" : "1.0"
      }, {
        "key" : "1524017336083",
        "value" : "1.0"
      }, {
        "key" : "1524017336085",
        "value" : "1.0"
      }, {
        "key" : "1524017336086",
        "value" : "1.0"
      }, {
        "key" : "count",
        "value" : "7"
      } ]
    }
  }
}

States stored in the processor:-

68512-states.png

Making same Rest Api call from NiFi processor:-
use InvokeHttp processor with Remote URL as

http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state

68514-invokehttp.png

Output:-
From the response relationship you will get same response as above

{
  "componentState" : {
    "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
    "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
    "localState" : {
      "scope" : "LOCAL",
      "totalEntryCount" : 7,
      "state" : [ {
        "key" : "1524017336077", //current batch timestamp
        "value" : "1.0"
      }, {
        "key" : "1524017336079",
        "value" : "1.0"
      }, {
        "key" : "1524017336082",
        "value" : "1.0"
      }, {
        "key" : "1524017336083",
        "value" : "1.0"
      }, {
        "key" : "1524017336085",
        "value" : "1.0"
      }, {
        "key" : "1524017336086",
        "value" : "1.0"
      }, {
        "key" : "count",
        "value" : "7"
      } ]
    }
  }
}

As the first state array value will be your current batch timestamp.

.

If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.


flow.png

avatar
New Contributor

Thanks @Shu, that's a very comprehensive explanation. I'll see what I can make from this.

Chris.