Created 04-17-2018 10:26 AM
Hi all,
I'm experimenting with the AttributeRollingWindow processor for a solution to monitor the number of messages hitting different Kafka topics. Is there any way of getting the start_curr_batch_ts values (available in the View State menu) in to a flow file's attributes along with the usual rolling_window_count, ..._mean and _value?
Thanks,
Chris.
Created on 04-18-2018 02:38 AM - edited 08-17-2019 07:04 PM
@Chris Grren
We cannot add Stored State values to the flow files attributes, How ever you can create custom processor which can implement the logic of getting current batch timestamp attribute added to each flowfile.
You can also use RestApi call to the AttributeRollingWindow processor to know what is the current batch timestamp value.
We can use Get method with make a call to the below path to get the current state of the processor
GET /processors/{id}/state //Gets the state for a processor
For more details about Rest Api calls refer to this link.
Example:-
As i'm having AttributeRollingWindow processor with processor id as d372cfe5-0162-1000-398e-62f2d3652166 and from command line we need to make rest api call as below.
Making Rest Api call from Command Line:-
$ curl -i -X GET -H 'Content-Type:application/json' http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/stateResponse:-
{ "componentState" : { "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166", "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.", "localState" : { "scope" : "LOCAL", "totalEntryCount" : 7, "state" : [ { "key" : "1524017336077", //current batch timestamp "value" : "1.0" }, { "key" : "1524017336079", "value" : "1.0" }, { "key" : "1524017336082", "value" : "1.0" }, { "key" : "1524017336083", "value" : "1.0" }, { "key" : "1524017336085", "value" : "1.0" }, { "key" : "1524017336086", "value" : "1.0" }, { "key" : "count", "value" : "7" } ] } } }
States stored in the processor:-
Making same Rest Api call from NiFi processor:-
use InvokeHttp processor with Remote URL as
http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state
Output:-
From the response relationship you will get same response as above
{ "componentState" : { "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166", "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.", "localState" : { "scope" : "LOCAL", "totalEntryCount" : 7, "state" : [ { "key" : "1524017336077", //current batch timestamp "value" : "1.0" }, { "key" : "1524017336079", "value" : "1.0" }, { "key" : "1524017336082", "value" : "1.0" }, { "key" : "1524017336083", "value" : "1.0" }, { "key" : "1524017336085", "value" : "1.0" }, { "key" : "1524017336086", "value" : "1.0" }, { "key" : "count", "value" : "7" } ] } } }
As the first state array value will be your current batch timestamp.
.
If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created on 04-18-2018 02:38 AM - edited 08-17-2019 07:04 PM
@Chris Grren
We cannot add Stored State values to the flow files attributes, How ever you can create custom processor which can implement the logic of getting current batch timestamp attribute added to each flowfile.
You can also use RestApi call to the AttributeRollingWindow processor to know what is the current batch timestamp value.
We can use Get method with make a call to the below path to get the current state of the processor
GET /processors/{id}/state //Gets the state for a processor
For more details about Rest Api calls refer to this link.
Example:-
As i'm having AttributeRollingWindow processor with processor id as d372cfe5-0162-1000-398e-62f2d3652166 and from command line we need to make rest api call as below.
Making Rest Api call from Command Line:-
$ curl -i -X GET -H 'Content-Type:application/json' http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/stateResponse:-
{ "componentState" : { "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166", "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.", "localState" : { "scope" : "LOCAL", "totalEntryCount" : 7, "state" : [ { "key" : "1524017336077", //current batch timestamp "value" : "1.0" }, { "key" : "1524017336079", "value" : "1.0" }, { "key" : "1524017336082", "value" : "1.0" }, { "key" : "1524017336083", "value" : "1.0" }, { "key" : "1524017336085", "value" : "1.0" }, { "key" : "1524017336086", "value" : "1.0" }, { "key" : "count", "value" : "7" } ] } } }
States stored in the processor:-
Making same Rest Api call from NiFi processor:-
use InvokeHttp processor with Remote URL as
http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state
Output:-
From the response relationship you will get same response as above
{ "componentState" : { "componentId" : "d372cfe5-0162-1000-398e-62f2d3652166", "stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.", "localState" : { "scope" : "LOCAL", "totalEntryCount" : 7, "state" : [ { "key" : "1524017336077", //current batch timestamp "value" : "1.0" }, { "key" : "1524017336079", "value" : "1.0" }, { "key" : "1524017336082", "value" : "1.0" }, { "key" : "1524017336083", "value" : "1.0" }, { "key" : "1524017336085", "value" : "1.0" }, { "key" : "1524017336086", "value" : "1.0" }, { "key" : "count", "value" : "7" } ] } } }
As the first state array value will be your current batch timestamp.
.
If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created 04-18-2018 08:29 AM
Thanks @Shu, that's a very comprehensive explanation. I'll see what I can make from this.
Chris.