Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Dynamically Change Scheduled timer

Highlighted

Dynamically Change Scheduled timer

New Contributor

Hello,

I am trying to set up a processor that runs at a variable interval. I'd like to be able to change that interval amount dynamically by reading the content/setting an attribute from control message received by the flow.

Is that possible? I have looked but not been able to find a way to do that.

Thank you.

-Aaron

4 REPLIES 4
Highlighted

Re: Dynamically Change Scheduled timer

Explorer

Hi Aaron,


What would like to achieve? Could you please provide some examples?

Highlighted

Re: Dynamically Change Scheduled timer

New Contributor

Hi Gergely,


This short version of my question: Can I set the Run Schedule value dynamically?107333-config.png

I have an array of different sensors that need to be polled and have value saved/reported. This would be a sample flow:107343-sensor-monitor.pngI need to be able to change the interval of each sensor pulling. I'm thinking the Monitor Controller being able to receive a json message such as:

{

"Sensor1": 60,

"Sensor2": 600,

"Sensor3": 600,

}

To set Sensor1 to 60s interval, Sensor 2 and 3 to 10 minutes. Then send in:

{

"Sensor1": 600,

"Sensor2": 60,

"Sensor3": 60,

}

To change Sensor 2 and 3 to 60s and Sensor 1 to 10 minutes when needed.

I need the flow to be autonomous beyond the control messages. I can't have an external event "poke" the pollers constantly.


Don't know if there is a way to do this in NiFi or if I have to implement my own bookkeeping in a custom processor.


Thanks.

Re: Dynamically Change Scheduled timer

Explorer

Hi Aaron,

Cloudbreak does not support this feature. Make sure that NIFI supports this feature, please add the Nifi tag to the question.


Gergely

Highlighted

Re: Dynamically Change Scheduled timer

Master Guru

@Aaron R

The only method you can use to systematically adjust the configured run Schedule of a NiFi processor is through the use of the NiFi rest API.
You would achieve this through a series of invokeHTTP processors.

-

1. Use invokeHTTP processor to stop another processor.
2. Use invokeHTTP processor to change "Run schedule" configuration on a processor.
3. Use InvokeHTTP processor to start processor again. (Note that the processor will immediately execute when you start it and then not execute again for newly configured "run schedule" duration.)

-

You would then need to repeat above process for all three of your sensor inputs.

Since I do not know the full details of your use case, it is not clear if this solution will really meet you needs. However, I can tell that based on how NiFi was developed, this is the only method for adjusting this configuration without user manual intervention.

-

You may just be better off building a flow that sends a FlowFile to each of the sensors invokeHTTP processor to trigger their execution. The InvokeHTTP processor does accept an inbound connection. When setup this way the invokeHTTP processor configured "run schedule" controls how often the processor checks that inbound connection for a FlowFile. When a FlowFile exists it will execute.
-

When a FlowFile is generated (by your "Sensor monitor controller" for example), that FlowFile will be timestamped. First you would need to split that incoming JSON into three different FlowFiles (one for each sensor). You could build a flow that parses the various sensor setting in to NiFi FlowFile Attributes and then create a if/then loop to check when the FlowFile timestamp + sensor value is equal to or less than current time. When that loop exits you route the FlowFile to appropriate invokeHTTP processor to trigger it to collect the sensor data.
-
Now becomes the question of what do you want to do with "Original" FlowFile out of the invokeHTTP. You can discard it and expect a new sensor monitor input each time or you can reset its trigger time and loop it back to the start. You would also need to in this case build in a method in your flow to kick this looping FlowFile out once a new sensor input comes in. (perhaps using the DistributedMapCache to store a value you increment each time a new timer comes in and then have your looping FlowFiles fetch this value to see if it has changed and kick FlowFile out if it has).

-

Hope these ideas help you with your design decisions.

-

Thank you,

Matt

-

If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.

Don't have an account?
Coming from Hortonworks? Activate your account here