Support Questions

Find answers, ask questions, and share your expertise

NIFI - How to compute average across flowfiles?

avatar
Contributor

Hello all,

I am using Nifi and I am pulling JSON data from a kafka topic using ConsumeKafka processor. I need to compute an average of a particular attribute's value across multiple flowfiles in a given time window. Any ideas on how this can be done? I have tried using EvaluateJsonPath and UpdateAttribute to do this but it seems that UpdateAttribute only allows me to do mathematical operations on the current flowfile and I cannot access the previous flowfile using this processor. Is there a processor that can do this?

Thanks.

1 ACCEPTED SOLUTION

avatar
Master Guru

In an upcoming release you will be able to keep stateful variables using UpdateAttribute (NIFI-1582), the author envisions it being able to support running averages. Also I started an AggregateValues processor under NIFI-2735, but haven't been able to finish it yet, and it works on micro-batches (such as files created from a split processor like SplitJson) rather than rolling windows.

In the meantime if you are familiar with a scripting language such as Groovy, Jython, Javascript, or JRuby you could use ExecuteScript or InvokeScriptedProcessor, they have access (via the ProcessContext) to the StateManager, where you could keep state, averages, etc, across flow files over time.

View solution in original post

1 REPLY 1

avatar
Master Guru

In an upcoming release you will be able to keep stateful variables using UpdateAttribute (NIFI-1582), the author envisions it being able to support running averages. Also I started an AggregateValues processor under NIFI-2735, but haven't been able to finish it yet, and it works on micro-batches (such as files created from a split processor like SplitJson) rather than rolling windows.

In the meantime if you are familiar with a scripting language such as Groovy, Jython, Javascript, or JRuby you could use ExecuteScript or InvokeScriptedProcessor, they have access (via the ProcessContext) to the StateManager, where you could keep state, averages, etc, across flow files over time.