Created 12-13-2018 03:06 PM
Hi
I need to extract my processors for at particular Processor Group with id = 5b641351-34d0-3def-a376-7824fbe9cc0f
This Processor Group contains 10 processors and another Processor Group with 5 Processors
when i want to extract the Processors i only get the 5 processors from my "sub" Processor-Group
test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f') for item in test: print (str(item.id))
result
(nipyapi) λ python main.py c7aad022-0ab3-353f-90cb-0999781d6309 7287c6b0-c9a8-3436-9902-52fb806e7c42 d3bd0289-490a-389c-ba90-c27c48a1e453 fb278074-6249-3aeb-88f8-d3bee857be76
I was expecting a list of 10 processors id's for the Processor-Group i call canvas.list_all_processors with,
It seems that it takes the deepest processor-group within the Processor-Group given in the argument.
It this work by design ? and is there another way to get the processors from top down within the group asked upon.
Created 12-15-2018 08:40 PM
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.
Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.
def test_list_nested_processors(regress_nifi, fix_pg, fix_proc): pg_1 = fix_pg.generate( parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id') ) pg_2 = fix_pg.generate(parent_pg=pg_1) root_proc_1 = fix_proc.generate() pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1) pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1) pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2) r1 = canvas.list_all_processors('root') assert len(r1) == 7 r2 = canvas.list_all_processors(pg_1.id) > assert len(r2) == 6 E AssertionError: assert 4 == 6 def list_all_processors(pg_id='root'): """ Returns a flat list of all Processors under the provided Process Group Args: pg_id (str): The UUID of the Process Group to start from, defaults to the Canvas root Returns: list[ProcessorEntity] """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" if nipyapi.utils.check_version('1.2.0') == -1: targets = nipyapi.nifi.ProcessGroupsApi().get_processors( id=pg_id, include_descendant_groups=True ) return targets.processors else: out = [] # list of child process groups pg_ids = [x.id for x in list_all_process_groups(pg_id)] # if not root, include the parent pg in the target list # root is a special case that is included if targeted by # list_all_process_groups if pg_id == 'root' or pg_id == get_root_pg_id(): pass else: pg_ids.append(pg_id) # process target list for this_pg_id in pg_ids: procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id) if procs.processors: out += procs.processors return out
Created 12-13-2018 08:43 PM
Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?
Created 12-14-2018 06:44 AM
Hi @Dan Chaffelson thanks for the quick response
here is my code , this is just a POC so it may seems somewhat unstructured, just a playground
import os import sys import getpass import json import smtplib from nipyapi import nifi, config, templates, canvas from config import ConfigIni # Disable urllib3 certificate warnings from requests.packages.urllib3 import disable_warnings disable_warnings() class NifiInstance: """ The NifiInstance class facilitating easy to use methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi) wrapper library. Arguments: url (str): Nifi host url, defaults to environment variable `NIFI_HOST`. username (str): Nifi username, defaults to environment variable `NIFI_USERNAME`. password (str): Nifi password, defaults to environment variable `NIFI_PASSWORD`. verify_ssl (bool): Whether to verify SSL connection - UNUSED as of now. """ def __init__(self, url=None, username=None, password=None, verify_ssl=False): config.nifi_config.host = self._get_url(url) config.nifi_config.verify_ssl = verify_ssl config.nifi_config.username = username self._authenticate(username, password) def _get_url(self, url): if not url: try: url = os.environ['NIFI_HOST'] except KeyError: url = input('Nifi host: ') if not '/nifi-api' in url: if not url[-1] == '/': url = url + '/' url = url + 'nifi-api' return url def _authenticate(self, username=None, password=None): if not username: try: config.nifi_config.username = os.environ['NIFI_USERNAME'] except KeyError: config.nifi_config.username = input('Username: ') if not password: try: password = os.environ['NIFI_PASSWORD'] except KeyError: password = getpass.getpass('Password: ') access_token = None try: access_token = nifi.AccessApi().create_access_token(username=config.nifi_config.username,password=password) except nifi.rest.ApiException as e: print('Exception when calling AccessApi->create_access_token: %s\n'.format(e)) config.nifi_config.api_key[username] = access_token config.nifi_config.api_client = nifi.ApiClient(header_name='Authorization', header_value='Bearer {}'.format(access_token)) def list_processors_in_processorgroup(self,pg_id=None): listen = canvas.list_all_processors(pg_id) #jlisten= json.loads(listen) for item in listen: #print (str(item.id)) print (str(item.status.name)) def processor_status(self,p_id=None): pro=canvas.get_processor(p_id, 'id') return pro.status.run_status #get urls and froups to list from ini file. start_init = ConfigIni('c:/temp/nipyapi/monitor.ini') #groups are returned as list of groups groups= start_init.get_group_id() url = start_init.get_url() n = NifiInstance(url, 'myuser','mypassword') #test list_all_processor test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f') for item in test: print (str(item .id))
my nipyapi version is
C:\Temp\nipyapi (nipyapi) λ pip freeze | grep nipyapi nipyapi==0.11.0 C:\Temp\nipyapi
Created 12-15-2018 08:40 PM
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.
Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.
def test_list_nested_processors(regress_nifi, fix_pg, fix_proc): pg_1 = fix_pg.generate( parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id') ) pg_2 = fix_pg.generate(parent_pg=pg_1) root_proc_1 = fix_proc.generate() pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1) pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1) pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2) pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2) r1 = canvas.list_all_processors('root') assert len(r1) == 7 r2 = canvas.list_all_processors(pg_1.id) > assert len(r2) == 6 E AssertionError: assert 4 == 6 def list_all_processors(pg_id='root'): """ Returns a flat list of all Processors under the provided Process Group Args: pg_id (str): The UUID of the Process Group to start from, defaults to the Canvas root Returns: list[ProcessorEntity] """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" if nipyapi.utils.check_version('1.2.0') == -1: targets = nipyapi.nifi.ProcessGroupsApi().get_processors( id=pg_id, include_descendant_groups=True ) return targets.processors else: out = [] # list of child process groups pg_ids = [x.id for x in list_all_process_groups(pg_id)] # if not root, include the parent pg in the target list # root is a special case that is included if targeted by # list_all_process_groups if pg_id == 'root' or pg_id == get_root_pg_id(): pass else: pg_ids.append(pg_id) # process target list for this_pg_id in pg_ids: procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id) if procs.processors: out += procs.processors return out
Created 12-18-2018 07:32 AM
Thats great news thank you Dan.