Created 01-12-2017 03:02 PM
Hello all,
I am using Nifi and I am pulling JSON data from a kafka topic using ConsumeKafka processor. I need to compute an average of a particular attribute's value across multiple flowfiles in a given time window. Any ideas on how this can be done? I have tried using EvaluateJsonPath and UpdateAttribute to do this but it seems that UpdateAttribute only allows me to do mathematical operations on the current flowfile and I cannot access the previous flowfile using this processor. Is there a processor that can do this?
Thanks.
Created 01-12-2017 03:12 PM
In an upcoming release you will be able to keep stateful variables using UpdateAttribute (NIFI-1582), the author envisions it being able to support running averages. Also I started an AggregateValues processor under NIFI-2735, but haven't been able to finish it yet, and it works on micro-batches (such as files created from a split processor like SplitJson) rather than rolling windows.
In the meantime if you are familiar with a scripting language such as Groovy, Jython, Javascript, or JRuby you could use ExecuteScript or InvokeScriptedProcessor, they have access (via the ProcessContext) to the StateManager, where you could keep state, averages, etc, across flow files over time.
Created 01-12-2017 03:12 PM
In an upcoming release you will be able to keep stateful variables using UpdateAttribute (NIFI-1582), the author envisions it being able to support running averages. Also I started an AggregateValues processor under NIFI-2735, but haven't been able to finish it yet, and it works on micro-batches (such as files created from a split processor like SplitJson) rather than rolling windows.
In the meantime if you are familiar with a scripting language such as Groovy, Jython, Javascript, or JRuby you could use ExecuteScript or InvokeScriptedProcessor, they have access (via the ProcessContext) to the StateManager, where you could keep state, averages, etc, across flow files over time.