Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Monitor backpressure count and size from custom processor

Highlighted

Monitor backpressure count and size from custom processor

New Contributor

I have a custom processor (NiFi 1.8.0) that already modifies incoming flow files as needed. However before transferring the file to the normal outgoing relationship I would like to check if that relationship's backpressure is close to exceeding it's threshold. If it is then I plan to send the flow file to a different relationship that connects to a PutFile processor where it will be written to disk.

 

I know I can get the incoming queue count and size. But I can't figure out how to get count and size from the outgoing relationship's connection. Is this doable in a Java custom processor?

2 REPLIES 2
Highlighted

Re: Monitor backpressure count and size from custom processor

New Contributor

I ended up finding the connections from the ProcessGroupStatus object:

 

String myProcessorId = this.getIdentifier();
int queuedCount = 0;
float queuedBytes = 0;
ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();

  if (processGroupStatus.getConnectionStatus() != null {
    Collection < CollectionStatus > groupConnections = processGroupStatus.getConnectionStatus();

    // Now have to iterate through groupConnections to find the one where the connection's source ID = myProcessorId and  
    // the connection's name = 'normal output' (this is the name of a relationship I added)

    ArrayList connections = new ArrayList <> (groupConnections);
    for (Object processorConnection : connections) {
     ConnectionStatus connection = (ConnectionStatus) processorConnection;

     if (connection.getName().equals("normal output") && connections.getSourceId.equals(myProcessorId)) {
      // Now I can grab the current count and size of the 'normal output' relationship
      // The back pressure threshold values can be grabbed from the connection as well
      queuedCount = connection.getQueuedCount();
      queuedBytes = connection.getQueuedBytes();
      break;
     }
    }
   }

The above only retrieves connections from the parent group. If the connection you're looking for is contained in a child group, you will need to iterate through the child groups:

ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();
ArrayList childProcessorGroups = new ArrayList < > (processGroupStatus.getProcessGroupStatus());
for (Object childProcessorGroup : childProcessorGroups) {
 ProcessGroupStatus childProcessGroupStatus = (ProcessGroupStatus) childProcessorGroup;
 Collection < CollectionStatus > groupConnections = childProcessGroupStatus.getConnectionStatus();
 // Then iterate through groupConnections as above
}

 The NiFi getControllerServiceLookup() does show an 'allConnections' variable which contains all connections across all processors in all groups. But there doesn't appear to be a getter for it. If there was a getter for it, you wouldn't have to worry about which group to look in for connections. You could simply iterate through 'allConnections' and look for the connection matching your processor ID and relationship name.

Highlighted

Re: Monitor backpressure count and size from custom processor

New Contributor

I ended up finding the connections from the ProcessGroupStatus object:

String myProcessorId = this.getIdentifier();
int queuedCount = 0;
float queuedBytes = 0;
ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();

  if (processGroupStatus.getConnectionStatus() != null {
    Collection < CollectionStatus > groupConnections = processGroupStatus.getConnectionStatus();

    // Now have to iterate through groupConnections to find the one where the connection's source ID = myProcessorId and  
    // the connection's name = 'normal output' (this is the name of a relationship I added)

    ArrayList connections = new ArrayList <> (groupConnections);
    for (Object processorConnection : connections) {
     ConnectionStatus connection = (ConnectionStatus) processorConnection;

     if (connection.getName().equals("normal output") && connections.getSourceId.equals(myProcessorId)) {
      // Now I can grab the current count and size of the 'normal output' relationship
      // The back pressure threshold values can be grabbed from the connection as well
      queuedCount = connection.getQueuedCount();
      queuedBytes = connection.getQueuedBytes();
      break;
     }
    }
   }

 The above only retrieves connections from the parrent group. If the connection you're looking for is contained in a child group, you will need to iterate through the child groups: 

ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();
ArrayList childProcessorGroups = new ArrayList < > (processGroupStatus.getProcessGroupStatus());
for (Object childProcessorGroup : childProcessorGroups) {
 ProcessGroupStatus childProcessGroupStatus = (ProcessGroupStatus) childProcessorGroup;
 Collection < CollectionStatus > groupConnections = childProcessGroupStatus.getConnectionStatus();
 // Then iterate through groupConnections as above
}

The NiFi getControllerServiceLookup() does show an 'allConnections' variable which contains all connections across all processors in all groups. But there doesn't appear to be a getter for it. If there was a getter for it, you wouldn't have to worry about which group to look in for connections. You could simply iterate through 'allConnections' and look for the connection matching your processor ID and relationship name.

Don't have an account?
Coming from Hortonworks? Activate your account here