Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

nipyapi canvas.list_all_processors lists only processors in sub processorgroup

avatar
Expert Contributor

Hi

I need to extract my processors for at particular Processor Group with id = 5b641351-34d0-3def-a376-7824fbe9cc0f

This Processor Group contains 10 processors and another Processor Group with 5 Processors

when i want to extract the Processors i only get the 5 processors from my "sub" Processor-Group

test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')
for item in test:
            print (str(item.id)) 

result

(nipyapi) λ python main.py
c7aad022-0ab3-353f-90cb-0999781d6309
7287c6b0-c9a8-3436-9902-52fb806e7c42
d3bd0289-490a-389c-ba90-c27c48a1e453
fb278074-6249-3aeb-88f8-d3bee857be76

I was expecting a list of 10 processors id's for the Processor-Group i call canvas.list_all_processors with,

It seems that it takes the deepest processor-group within the Processor-Group given in the argument.

It this work by design ? and is there another way to get the processors from top down within the group asked upon.

@Dan Chaffelson

1 ACCEPTED SOLUTION

avatar

Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.

Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.

def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
        pg_1 = fix_pg.generate(
            parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
        )
        pg_2 = fix_pg.generate(parent_pg=pg_1)
        root_proc_1 = fix_proc.generate()
        pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
        pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
        pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
        r1 = canvas.list_all_processors('root')
        assert len(r1) == 7
        r2 = canvas.list_all_processors(pg_1.id)
>       assert len(r2) == 6
E       AssertionError: assert 4 == 6




def list_all_processors(pg_id='root'):
    """
    Returns a flat list of all Processors under the provided Process Group


    Args:
        pg_id (str): The UUID of the Process Group to start from, defaults to
            the Canvas root


    Returns:
         list[ProcessorEntity]
    """
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"


    if nipyapi.utils.check_version('1.2.0') == -1:
        targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
            id=pg_id,
            include_descendant_groups=True
        )
        return targets.processors
    else:
        out = []
        # list of child process groups
        pg_ids = [x.id for x in list_all_process_groups(pg_id)]
        # if not root, include the parent pg in the target list
        # root is a special case that is included if targeted by
        # list_all_process_groups
        if pg_id == 'root' or pg_id == get_root_pg_id():
            pass
        else:
            pg_ids.append(pg_id)
        # process target list
        for this_pg_id in pg_ids:
            procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
            if procs.processors:
                out += procs.processors
        return out



View solution in original post

4 REPLIES 4

avatar

Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?

avatar
Expert Contributor

Hi @Dan Chaffelson thanks for the quick response

here is my code , this is just a POC so it may seems somewhat unstructured, just a playground


	import os
	import sys
	import getpass
	import json
	import smtplib
	from nipyapi import nifi, config, templates, canvas
	from config import ConfigIni


	# Disable urllib3 certificate warnings
	from requests.packages.urllib3 import disable_warnings
	disable_warnings()




	class NifiInstance:
		""" The NifiInstance class facilitating easy to use
		methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi)
		wrapper library.


		Arguments:
			url         (str): Nifi host url, defaults to environment variable `NIFI_HOST`.
			username    (str): Nifi username, defaults to environment variable `NIFI_USERNAME`.
			password    (str): Nifi password, defaults to environment variable `NIFI_PASSWORD`.
			verify_ssl  (bool): Whether to verify SSL connection - UNUSED as of now.


		"""


		def __init__(self, url=None, username=None, password=None, verify_ssl=False):
			config.nifi_config.host = self._get_url(url)
			config.nifi_config.verify_ssl = verify_ssl
			config.nifi_config.username = username
			self._authenticate(username, password)




		def _get_url(self, url):
			if not url:
				try:
					url = os.environ['NIFI_HOST']
				except KeyError:
					url = input('Nifi host: ')
			if not '/nifi-api' in url:
				if not url[-1] == '/':
					url = url + '/'
				url = url + 'nifi-api'
			return url




		def _authenticate(self, username=None, password=None):
			if not username:
				try:
					config.nifi_config.username = os.environ['NIFI_USERNAME']
				except KeyError:
					config.nifi_config.username = input('Username: ')


			if not password:
				try:
					password = os.environ['NIFI_PASSWORD']
				except KeyError:
					password = getpass.getpass('Password: ')


			access_token = None
			try:
				access_token = nifi.AccessApi().create_access_token(username=config.nifi_config.username,password=password)            
			except nifi.rest.ApiException as e:
				print('Exception when calling AccessApi->create_access_token: %s\n'.format(e))


			config.nifi_config.api_key[username] = access_token
			config.nifi_config.api_client = nifi.ApiClient(header_name='Authorization', header_value='Bearer {}'.format(access_token))        


		def list_processors_in_processorgroup(self,pg_id=None):
			listen = canvas.list_all_processors(pg_id)
			#jlisten= json.loads(listen)
			for item in listen:
				#print (str(item.id))
				print (str(item.status.name))


		
		
		def processor_status(self,p_id=None):
			pro=canvas.get_processor(p_id, 'id')
			return pro.status.run_status
		
	#get urls and froups to list from ini file. 
	start_init = ConfigIni('c:/temp/nipyapi/monitor.ini')
	#groups are returned as list of groups 
	groups= start_init.get_group_id()
	url = start_init.get_url()


	n = NifiInstance(url, 'myuser','mypassword')


	#test list_all_processor 
	test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')


	for item in test:
				print (str(item .id))



my nipyapi version is

C:\Temp\nipyapi (nipyapi) λ pip freeze | grep nipyapi nipyapi==0.11.0 C:\Temp\nipyapi

avatar

Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.

Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.

def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
        pg_1 = fix_pg.generate(
            parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
        )
        pg_2 = fix_pg.generate(parent_pg=pg_1)
        root_proc_1 = fix_proc.generate()
        pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
        pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
        pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
        r1 = canvas.list_all_processors('root')
        assert len(r1) == 7
        r2 = canvas.list_all_processors(pg_1.id)
>       assert len(r2) == 6
E       AssertionError: assert 4 == 6




def list_all_processors(pg_id='root'):
    """
    Returns a flat list of all Processors under the provided Process Group


    Args:
        pg_id (str): The UUID of the Process Group to start from, defaults to
            the Canvas root


    Returns:
         list[ProcessorEntity]
    """
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"


    if nipyapi.utils.check_version('1.2.0') == -1:
        targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
            id=pg_id,
            include_descendant_groups=True
        )
        return targets.processors
    else:
        out = []
        # list of child process groups
        pg_ids = [x.id for x in list_all_process_groups(pg_id)]
        # if not root, include the parent pg in the target list
        # root is a special case that is included if targeted by
        # list_all_process_groups
        if pg_id == 'root' or pg_id == get_root_pg_id():
            pass
        else:
            pg_ids.append(pg_id)
        # process target list
        for this_pg_id in pg_ids:
            procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
            if procs.processors:
                out += procs.processors
        return out



avatar
Expert Contributor

Thats great news thank you Dan.