Is there a way to read multiple inputs that each have a different frequency into NiFi and have NiFi align the values for those inputs using the least common multiple of the frequencies? I have a heart rate input that has a frequency of 1/s, a blood pressure input that has a frequency of 1/min, and an oxygen saturation input that has a frequency of x/hr. I want NiFi to give me heart rate, blood pressure, and oxygen saturation readings every lcm(1,60,x) seconds.
Vlad, I would suggest you consider putting values into a NiFi's cache service with a known compound lookup key and have a processor pull all three and make a decision from those values. Take a look at a scripted invocation processor to simplify any custom coding.
@Andrew Grande have you come across any examples that can help make this work? When I looked at (Fetch/Put)DistributedMapCache processor I ran into a problem of not knowing the cache keys to use during fetch or put. The metrics I mention in the question are associated with many patients. I'd need some way to fetch from cache keys for every patient, and the list of patients is constantly changing.
Think same rules and keys for hbase, to an extent. It's basically just a big distributed hashmap, so in this instance you would put something like time-type in the key, then retrieve based on time. That's going to need a custom processor in this scenario, since the Fetch processor will only grab one key, so you would also need to either build a custom processor pulling mulitple keys from the DistributedMapCacheClientService, or some sort of merge content flow, which would likely have incomplete result problems, or add significant latency.
Hi Vladimir. I'd say it depends what kind of processor you are using to get your data into NiFi. In addition to Andrew's answer, you may find useful the cron-based execution of processors.
Vlad following this thread, as this will eliminate the Storm/Hbase processing that we are doing for the demo.
Would that not be getting into complex event processing? I know there are certainly ways to achieve the effect you are looking for but isn't the main focus of Nifi to handle simple event processing? I know this is not a great answer but this seems to be the exact use case that Spark Streaming would be great at (Built in state functions based on key
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter.... It's that tired old "horses for courses argument".
This is very much more of a Spark use case than a NiFi one. I would use NiFi to feed Spark streaming using the batch interval, or the window option to batch. There are some downsides in this, as it relies on system time rather than logical time, but that's inevitable in these sorts of scenarios.
A possibly NiFi option (though again, not ideal) would be to use merge content, with the max group age set to the expected sampling interval to combine all the measurement FlowFiles into a single FlowFile for delivery. However, this is more likely to give you downstream schema problems depending on what you're hanging off the end of the pipeline.
In short, this is what spark streaming and storm with trident do well, so use them.