Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar

Introduction

Recently I was asked how to monitor and alert flowfile count in a connection queue when it exceeds a predefined threshold, I am trying to capture the steps to implement the same.

Prerequisites

1) To test this, Make sure HDF-2.x version of NiFi is up an running.

2) You Already have a connection with data queued in it(say more than 20 flowfiles). Else you can create one like below:

12491-originalflow.jpg

3) Make a note of the Connection name and uuid to be monitored:

12492-original-flow-settings.jpg

Creating a Flow to Monitor Connection Queue Count.

1) Drop a GenerateFlowFile processor to the canvas and make "Run Schedule" 60 sec so we dont execute the flow to often.

2) Drop an UpdateAttribute processor, connect GenerateFlowFile's success relation to it and add below properties to it( the connection uuid noted above, threshold say 20, your NiFi host and port):

CONNECTION_UUID : dcbee9dd-0159-1000-45a7-8306c28f2786
COUNT           : 20
NIFI_HOST       : localhost
NIFI_PORT       : 8080

12493-updateattribute.jpg

3) Drop a InvokeHTTP processor to the canvas, connect UpdateAttribute's success relation to it, auto terminate all other relations and update its 2 properties as below:

HTTP Method : GET
Remote URL  : http://${NIFI_HOST}:${NIFI_PORT}/nifi-api/connections/${CONNECTION_UUID}

12494-invokehttp.jpg

4) Drop an EvaluateJsonPath processor to extract values from json with below properties, connect Response relation of InvokeHTTP to it, and auto terminate its failure and unmatched relations.

QUEUE_NAME : $.status.name
QUEUE_SIZE : $.status.aggregateSnapshot.flowFilesQueued

12495-evaluatejsonpath.jpg

5) Drop a RouteOnAttribute processor to the canvas with below configs, connect EvaluateJsonPath's matched relation to it and auto terminate its unmatched relation.

Queue_Size_Exceeded : ${QUEUE_SIZE:gt(${COUNT})}

12496-routeonattribute.jpg

6) Lastly add a PutEmail processor, connect RouteOnAttribute's matched relation to it and auto terminate all its relations. below are my properties set, you have to update it with your SMTP details:

SMTP Hostname       :   west.xxxx.yourServer.net
SMTP Port           :   587
SMTP Username       :   jgeorge@hortonworks.com
SMTP Password       :   Its_myPassw0rd_updateY0urs
SMTP TLS            :   true
From                :   jgeorge@hortonworks.com
To                  :   jgeorge@hortonworks.com
Subject             :   Queue Size Exceeded Threshold

12497-putemail.jpg

and message content should look something like below to grab all the values:

Message             :   Queue Size Exceeded Threshold for ${CONNECTION_UUID}                           Connection Name             :   ${QUEUE_NAME}                        
   Threshold Set               :   ${COUNT}                        
   Current FlowFile Count      :   ${QUEUE_SIZE}

12498-message-content.jpg

7) Now the flow is completed and li should look similar to below:

12499-finalflow.jpg

Staring the flow and testing it

1) Lets make sure at least 21 flow files are pending in the connection named 'DataToFileSystem' which was created in the Prerequisites

2) Now lets start the flow and you should receive mail alert from NiFi stating the count exceeded Threshold set which is 20 in our case.

My sample alert looks like below:

12500-alert-email.jpg

3) This concludes the tutorial for monitoring your connection queue count with NiFi.

4) Too lazy to create the flow???.. Download my template here

References

NiFi REST API

NiFI Expression Language

Thanks,

Jobin George

19,028 Views
Comments
avatar
Expert Contributor
@Jobin George

Thanks a lot for this awesome post. I had a question so to monitor every queue I need to have monitor individually right?

avatar
New Contributor

Can the same be done if the NIFI cluster is HTTPS?  I've read some other posts that appear to use CURL, presumably in some scripted process, which then creates tokens that get passed in subsequent REST calls.  But I need to be able to gather NIFI processor/queue status all within NIFI- no external scripting.  Is this possible?

avatar
New Contributor

This is excellent information thankyou. 

If a file in the queue process more than 2 mins can we get an alert ? Is there anyway including third party tools to get alerts ? I have been trying from long to sort this out. Can you please help me. 

Thankyou.  

avatar
Super Mentor

@RalphA @udayAle 

I encourage you to raise your question as a new question in Cloudera Community rather then asking your question as a comment on an existing article.  You can certainly. reference this article in your new community question.  You'll get better visibility that way to you query.

Thank you,

Matt