Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Can we empty all the queues inside a process-group level by calling only one rest-api ? What are the other function we can do at prosess group level directly by using only one rest-api call in nifi version 1.5.0 ?

Highlighted

Can we empty all the queues inside a process-group level by calling only one rest-api ? What are the other function we can do at prosess group level directly by using only one rest-api call in nifi version 1.5.0 ?

New Contributor

I faced this problem during deletion of a process group via rest-api code

3 REPLIES 3

Re: Can we empty all the queues inside a process-group level by calling only one rest-api ? What are the other function we can do at prosess group level directly by using only one rest-api call in nifi version 1.5.0 ?

Super Guru

@Daminee Sao

There is no restapi call supported in nifi that can clear off all the queues inside the process group level, all the drop requests rest api call expects connection id(queue id) to be included in call.

But by making use of Process Group Connections and Drop Requests RestApi call we can clear off all the queues inside the process group.

curl -i -X GET {nifi-ip-address}/nifi-api/process-groups/{id}/connections //to get all the connections in the process group
curl -i -X POST {nifi-ip-address}/nifi-api/flowfile-queues/{connection-id}/drop-requests //drop all flowfiles for the specific connection

Below are the process group level rest-api calls

59433-processgroup-restapi.png

refer to below link for more api calls supported on process group level.

https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

Example:-

i have my nifi instance running on localhost port 9090 and my process group id is 619a2801-0161-1000-a4c2-95e4430e977f,

once i make below rest api call will result all the connections that i'm having in the specific process group

curl -i -X GET http://localhost:9090/nifi-api/process-groups/619a2801-0161-1000-a4c2-95e4430e977f/connections

i'm having 2 connection id's(58629c69-0161-1000-b2e4-254f8afcde9c,619d132e-0161-1000-f689-198ce61282da)inside my process group.

To drop all requests in this connections, call drop requests api

curl -i -X POST http://localhost:9090/nifi-api/flowfile-queues/58629c69-0161-1000-b2e4-254f8afcde9c/drop-requests
curl -i -X POST http://localhost:9090/nifi-api/flowfile-queues/619d132e-0161-1000-f689-198ce61282da/drop-requests

Write a script that can extract all connection id's from connections api call and loop the connection id's in drop requests api call.

(or)

By using NiFi

Flow:-

GenerateFlowfile //trigger flow
InvokeHTTP //by using get method we are going to get json flowfile with all connections as content.
SplitJson //split on $.connections to get each connection as individual flowfile
EvaluateJsonPath //extract $.id from json message and keep as flowfile attribute
InvokeHTTP //by using post method use the extracted attribute and make a call to drop requests http://localhost:9090/nifi-api/flowfile-queues/${connection_id}/drop-requests

59435-flow.png

I have attached the xml file of the above flow save and use the flow make changes as per your needs.

drop-requests-167593.xml

Re: Can we empty all the queues inside a process-group level by calling only one rest-api ? What are the other function we can do at prosess group level directly by using only one rest-api call in nifi version 1.5.0 ?

Rising Star

@Shu
Great description and thanks for the example. It works just the way I need!

Re: Can we empty all the queues inside a process-group level by calling only one rest-api ? What are the other function we can do at prosess group level directly by using only one rest-api call in nifi version 1.5.0 ?

New Contributor

The following would purge all queues indiscriminately from NiFi using the python library nipyapi (assuming no auth)

import nipyapi
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api
#Queues can be large - increase timeout
nipyapi.config.short_max_wait = 3600
for queue in nipyapi.canvas.list_all_connections():
    print("clearing flow files from connection: " + queue.id, end=': ')
    print("Successful") if nipyapi.canvas.purge_connection(queue.id) else print("Unsuccessful")



Don't have an account?
Coming from Hortonworks? Activate your account here