Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Automatic deployment of NiFi Flow

avatar
Rising Star

Hello,

I try to work out some automated deployment situation in NiFi (no UI usage, just cli toolkit or alternatives).

 

Let us asume, you have a nifi flow under a load which need to be updated.
There are some data in in-between connection queues.
What happens when you try to update the flow?
Will some data be lost or will the update be refused?

 

Are there any solution for procedure like this.
1) stop the data receiving processor
2) check if the success connection queue is empty
3) apply steps 1 and 2 to the next processor till the last processor is stopped.
4) update the flow ignoring any data in failure or error connection queues

 

Do you think it is reasonable approach? Is there something ready to use?

Best regards
Jaro

 

1 ACCEPTED SOLUTION

avatar
Master Mentor

@Jarinek 

The process really depends on what update you are trying to make.

1. You can not remove a connection that has queued FlowFiles in it, but you can redirect it to a different target processor with queued data.
2. You can not redirect a connection if the processor it is currently attached to still has a running thread.  Stopping a processor does not kill threads, it simply tells the processor to not execute again at the configured run schedule.  Existing threads will continue to run until they complete.  Until all threads exit, the processor is still in a state of "stopping" even though UI reflect red square for "stopped".
3. You cannot modify a processor if is still has running threads (see note about "stopping" processors above)

4. If you stop the component that is on the receiving side of a connection, any FlowFiles queued on that connection, not tied to any active thread still running on target processor component, will not be processed and remain queued on the connection.  You can manual empty a queue through a rest-api call (means data loss), but that is not necessary if you are not deleting the connection.

Attempts to perform configuration changes when components still have active threads or are in a running state will result in an exception being thrown and the change not happening.
Attempts remove connections that have queued FlowFiles will throw an exception and block removal.

Now if all you are trying to do is modify some configuration on a processor, all you need to do is stop the processor, check that it has no active threads, make the config change, and then start the processor again.

Not sure wha you are asking with "update the flow ignoring any data in failure or error connection queues".  NiFi does not ignore queued FlowFiles.  It also not wise to leave connection with queued FlowFiles just sitting around your dataflows.   Those old queued FlowFile will prevent removal or content claims that contain that FlowFiles data.  Since a content claim can contain the data from 1 to many FlowFiles, this can result in your content repository filling up.  NiFi can only remove content claims which have no FlowFiles pointing to them anymore.

Here are some useful links:
https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html
https://github.com/Chaffelson/nipyapi

http://nifi.apache.org/docs/nifi-docs/rest-api/index.html
https://community.cloudera.com/t5/Community-Articles/Update-NiFi-Connection-Destination-via-REST-API...

https://community.cloudera.com/t5/Community-Articles/Change-NiFi-Flow-Using-Rest-API-Part-1/ta-p/244...


Hope this helps,
Matt

View solution in original post

2 REPLIES 2

avatar
Master Guru

If you are using versioning and the NiFi registry, when you apply a new version to a running process group it will stop things and wait until things are not in process.   https://pierrevillard.com/2018/04/09/automate-workflow-deployment-in-apache-nifi-with-the-nifi-regis...

 

See the Python Helper by Dan https://pypi.org/project/nipyapi/

 

CLI can do this, you'll have to look at the docs.

https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html

 

Examples

https://www.datainmotion.dev/2021/01/automating-starting-services-in-apache.html

 

The upcoming Cloudera DataFlow Experience does this automatically as part of autoscaling.

 

Make sure you use Load Balanced Queues between processors.

 

You can also use Stateless NiFi if you want things to start/stop just complete a fixed job.

https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

 

Extra docs

https://docs.cloudera.com/cdf-datahub/7.2.6/nifi-api/topics/cdf-datahub-nifi-rest-api.html

 

https://github.com/tspannhw/EverythingApacheNiFi

https://www.datainmotion.dev/2020/09/devops-working-with-parameter-contexts.html

https://www.datainmotion.dev/2020/10/automating-building-migration-backup.html

https://www.datainmotion.dev/2019/04/simple-apache-nifi-operations-dashboard.html

 

avatar
Master Mentor

@Jarinek 

The process really depends on what update you are trying to make.

1. You can not remove a connection that has queued FlowFiles in it, but you can redirect it to a different target processor with queued data.
2. You can not redirect a connection if the processor it is currently attached to still has a running thread.  Stopping a processor does not kill threads, it simply tells the processor to not execute again at the configured run schedule.  Existing threads will continue to run until they complete.  Until all threads exit, the processor is still in a state of "stopping" even though UI reflect red square for "stopped".
3. You cannot modify a processor if is still has running threads (see note about "stopping" processors above)

4. If you stop the component that is on the receiving side of a connection, any FlowFiles queued on that connection, not tied to any active thread still running on target processor component, will not be processed and remain queued on the connection.  You can manual empty a queue through a rest-api call (means data loss), but that is not necessary if you are not deleting the connection.

Attempts to perform configuration changes when components still have active threads or are in a running state will result in an exception being thrown and the change not happening.
Attempts remove connections that have queued FlowFiles will throw an exception and block removal.

Now if all you are trying to do is modify some configuration on a processor, all you need to do is stop the processor, check that it has no active threads, make the config change, and then start the processor again.

Not sure wha you are asking with "update the flow ignoring any data in failure or error connection queues".  NiFi does not ignore queued FlowFiles.  It also not wise to leave connection with queued FlowFiles just sitting around your dataflows.   Those old queued FlowFile will prevent removal or content claims that contain that FlowFiles data.  Since a content claim can contain the data from 1 to many FlowFiles, this can result in your content repository filling up.  NiFi can only remove content claims which have no FlowFiles pointing to them anymore.

Here are some useful links:
https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html
https://github.com/Chaffelson/nipyapi

http://nifi.apache.org/docs/nifi-docs/rest-api/index.html
https://community.cloudera.com/t5/Community-Articles/Update-NiFi-Connection-Destination-via-REST-API...

https://community.cloudera.com/t5/Community-Articles/Change-NiFi-Flow-Using-Rest-API-Part-1/ta-p/244...


Hope this helps,
Matt