Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

nipyapi canvas.list_all_processors lists only processors in sub processorgroup

avatar
Expert Contributor

Hi

I need to extract my processors for at particular Processor Group with id = 5b641351-34d0-3def-a376-7824fbe9cc0f

This Processor Group contains 10 processors and another Processor Group with 5 Processors

when i want to extract the Processors i only get the 5 processors from my "sub" Processor-Group

test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')
for item in test:
            print (str(item.id)) 

result

(nipyapi) λ python main.py
c7aad022-0ab3-353f-90cb-0999781d6309
7287c6b0-c9a8-3436-9902-52fb806e7c42
d3bd0289-490a-389c-ba90-c27c48a1e453
fb278074-6249-3aeb-88f8-d3bee857be76

I was expecting a list of 10 processors id's for the Processor-Group i call canvas.list_all_processors with,

It seems that it takes the deepest processor-group within the Processor-Group given in the argument.

It this work by design ? and is there another way to get the processors from top down within the group asked upon.

@Dan Chaffelson

1 ACCEPTED SOLUTION

avatar

Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.

Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.

def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
        pg_1 = fix_pg.generate(
            parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
        )
        pg_2 = fix_pg.generate(parent_pg=pg_1)
        root_proc_1 = fix_proc.generate()
        pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
        pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
        pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
        r1 = canvas.list_all_processors('root')
        assert len(r1) == 7
        r2 = canvas.list_all_processors(pg_1.id)
>       assert len(r2) == 6
E       AssertionError: assert 4 == 6




def list_all_processors(pg_id='root'):
    """
    Returns a flat list of all Processors under the provided Process Group


    Args:
        pg_id (str): The UUID of the Process Group to start from, defaults to
            the Canvas root


    Returns:
         list[ProcessorEntity]
    """
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"


    if nipyapi.utils.check_version('1.2.0') == -1:
        targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
            id=pg_id,
            include_descendant_groups=True
        )
        return targets.processors
    else:
        out = []
        # list of child process groups
        pg_ids = [x.id for x in list_all_process_groups(pg_id)]
        # if not root, include the parent pg in the target list
        # root is a special case that is included if targeted by
        # list_all_process_groups
        if pg_id == 'root' or pg_id == get_root_pg_id():
            pass
        else:
            pg_ids.append(pg_id)
        # process target list
        for this_pg_id in pg_ids:
            procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
            if procs.processors:
                out += procs.processors
        return out



View solution in original post

4 REPLIES 4

avatar

Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?

avatar
Expert Contributor

Hi @Dan Chaffelson thanks for the quick response

here is my code , this is just a POC so it may seems somewhat unstructured, just a playground


	import os
	import sys
	import getpass
	import json
	import smtplib
	from nipyapi import nifi, config, templates, canvas
	from config import ConfigIni


	# Disable urllib3 certificate warnings
	from requests.packages.urllib3 import disable_warnings
	disable_warnings()




	class NifiInstance:
		""" The NifiInstance class facilitating easy to use
		methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi)
		wrapper library.


		Arguments:
			url         (str): Nifi host url, defaults to environment variable `NIFI_HOST`.
			username    (str): Nifi username, defaults to environment variable `NIFI_USERNAME`.
			password    (str): Nifi password, defaults to environment variable `NIFI_PASSWORD`.
			verify_ssl  (bool): Whether to verify SSL connection - UNUSED as of now.


		"""


		def __init__(self, url=None, username=None, password=None, verify_ssl=False):
			config.nifi_config.host = self._get_url(url)
			config.nifi_config.verify_ssl = verify_ssl
			config.nifi_config.username = username
			self._authenticate(username, password)




		def _get_url(self, url):
			if not url:
				try:
					url = os.environ['NIFI_HOST']
				except KeyError:
					url = input('Nifi host: ')
			if not '/nifi-api' in url:
				if not url[-1] == '/':
					url = url + '/'
				url = url + 'nifi-api'
			return url




		def _authenticate(self, username=None, password=None):
			if not username:
				try:
					config.nifi_config.username = os.environ['NIFI_USERNAME']
				except KeyError:
					config.nifi_config.username = input('Username: ')


			if not password:
				try:
					password = os.environ['NIFI_PASSWORD']
				except KeyError:
					password = getpass.getpass('Password: ')


			access_token = None
			try:
				access_token = nifi.AccessApi().create_access_token(username=config.nifi_config.username,password=password)            
			except nifi.rest.ApiException as e:
				print('Exception when calling AccessApi->create_access_token: %s\n'.format(e))


			config.nifi_config.api_key[username] = access_token
			config.nifi_config.api_client = nifi.ApiClient(header_name='Authorization', header_value='Bearer {}'.format(access_token))        


		def list_processors_in_processorgroup(self,pg_id=None):
			listen = canvas.list_all_processors(pg_id)
			#jlisten= json.loads(listen)
			for item in listen:
				#print (str(item.id))
				print (str(item.status.name))


		
		
		def processor_status(self,p_id=None):
			pro=canvas.get_processor(p_id, 'id')
			return pro.status.run_status
		
	#get urls and froups to list from ini file. 
	start_init = ConfigIni('c:/temp/nipyapi/monitor.ini')
	#groups are returned as list of groups 
	groups= start_init.get_group_id()
	url = start_init.get_url()


	n = NifiInstance(url, 'myuser','mypassword')


	#test list_all_processor 
	test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')


	for item in test:
				print (str(item .id))



my nipyapi version is

C:\Temp\nipyapi (nipyapi) λ pip freeze | grep nipyapi nipyapi==0.11.0 C:\Temp\nipyapi

avatar

Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.

Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.

def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
        pg_1 = fix_pg.generate(
            parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
        )
        pg_2 = fix_pg.generate(parent_pg=pg_1)
        root_proc_1 = fix_proc.generate()
        pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
        pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
        pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
        pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
        r1 = canvas.list_all_processors('root')
        assert len(r1) == 7
        r2 = canvas.list_all_processors(pg_1.id)
>       assert len(r2) == 6
E       AssertionError: assert 4 == 6




def list_all_processors(pg_id='root'):
    """
    Returns a flat list of all Processors under the provided Process Group


    Args:
        pg_id (str): The UUID of the Process Group to start from, defaults to
            the Canvas root


    Returns:
         list[ProcessorEntity]
    """
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"


    if nipyapi.utils.check_version('1.2.0') == -1:
        targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
            id=pg_id,
            include_descendant_groups=True
        )
        return targets.processors
    else:
        out = []
        # list of child process groups
        pg_ids = [x.id for x in list_all_process_groups(pg_id)]
        # if not root, include the parent pg in the target list
        # root is a special case that is included if targeted by
        # list_all_process_groups
        if pg_id == 'root' or pg_id == get_root_pg_id():
            pass
        else:
            pg_ids.append(pg_id)
        # process target list
        for this_pg_id in pg_ids:
            procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
            if procs.processors:
                out += procs.processors
        return out



avatar
Expert Contributor

Thats great news thank you Dan.