Created 12-13-2018 03:06 PM
Hi
I need to extract my processors for at particular Processor Group with id = 5b641351-34d0-3def-a376-7824fbe9cc0f
This Processor Group contains 10 processors and another Processor Group with 5 Processors
when i want to extract the Processors i only get the 5 processors from my "sub" Processor-Group
test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')
for item in test:
print (str(item.id)) result
(nipyapi) λ python main.py c7aad022-0ab3-353f-90cb-0999781d6309 7287c6b0-c9a8-3436-9902-52fb806e7c42 d3bd0289-490a-389c-ba90-c27c48a1e453 fb278074-6249-3aeb-88f8-d3bee857be76
I was expecting a list of 10 processors id's for the Processor-Group i call canvas.list_all_processors with,
It seems that it takes the deepest processor-group within the Processor-Group given in the argument.
It this work by design ? and is there another way to get the processors from top down within the group asked upon.
Created 12-15-2018 08:40 PM
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.
Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.
def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
pg_1 = fix_pg.generate(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
)
pg_2 = fix_pg.generate(parent_pg=pg_1)
root_proc_1 = fix_proc.generate()
pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
r1 = canvas.list_all_processors('root')
assert len(r1) == 7
r2 = canvas.list_all_processors(pg_1.id)
> assert len(r2) == 6
E AssertionError: assert 4 == 6
def list_all_processors(pg_id='root'):
"""
Returns a flat list of all Processors under the provided Process Group
Args:
pg_id (str): The UUID of the Process Group to start from, defaults to
the Canvas root
Returns:
list[ProcessorEntity]
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"
if nipyapi.utils.check_version('1.2.0') == -1:
targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
id=pg_id,
include_descendant_groups=True
)
return targets.processors
else:
out = []
# list of child process groups
pg_ids = [x.id for x in list_all_process_groups(pg_id)]
# if not root, include the parent pg in the target list
# root is a special case that is included if targeted by
# list_all_process_groups
if pg_id == 'root' or pg_id == get_root_pg_id():
pass
else:
pg_ids.append(pg_id)
# process target list
for this_pg_id in pg_ids:
procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
if procs.processors:
out += procs.processors
return out
Created 12-13-2018 08:43 PM
Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?
Created 12-14-2018 06:44 AM
Hi @Dan Chaffelson thanks for the quick response
here is my code , this is just a POC so it may seems somewhat unstructured, just a playground
import os
import sys
import getpass
import json
import smtplib
from nipyapi import nifi, config, templates, canvas
from config import ConfigIni
# Disable urllib3 certificate warnings
from requests.packages.urllib3 import disable_warnings
disable_warnings()
class NifiInstance:
""" The NifiInstance class facilitating easy to use
methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi)
wrapper library.
Arguments:
url (str): Nifi host url, defaults to environment variable `NIFI_HOST`.
username (str): Nifi username, defaults to environment variable `NIFI_USERNAME`.
password (str): Nifi password, defaults to environment variable `NIFI_PASSWORD`.
verify_ssl (bool): Whether to verify SSL connection - UNUSED as of now.
"""
def __init__(self, url=None, username=None, password=None, verify_ssl=False):
config.nifi_config.host = self._get_url(url)
config.nifi_config.verify_ssl = verify_ssl
config.nifi_config.username = username
self._authenticate(username, password)
def _get_url(self, url):
if not url:
try:
url = os.environ['NIFI_HOST']
except KeyError:
url = input('Nifi host: ')
if not '/nifi-api' in url:
if not url[-1] == '/':
url = url + '/'
url = url + 'nifi-api'
return url
def _authenticate(self, username=None, password=None):
if not username:
try:
config.nifi_config.username = os.environ['NIFI_USERNAME']
except KeyError:
config.nifi_config.username = input('Username: ')
if not password:
try:
password = os.environ['NIFI_PASSWORD']
except KeyError:
password = getpass.getpass('Password: ')
access_token = None
try:
access_token = nifi.AccessApi().create_access_token(username=config.nifi_config.username,password=password)
except nifi.rest.ApiException as e:
print('Exception when calling AccessApi->create_access_token: %s\n'.format(e))
config.nifi_config.api_key[username] = access_token
config.nifi_config.api_client = nifi.ApiClient(header_name='Authorization', header_value='Bearer {}'.format(access_token))
def list_processors_in_processorgroup(self,pg_id=None):
listen = canvas.list_all_processors(pg_id)
#jlisten= json.loads(listen)
for item in listen:
#print (str(item.id))
print (str(item.status.name))
def processor_status(self,p_id=None):
pro=canvas.get_processor(p_id, 'id')
return pro.status.run_status
#get urls and froups to list from ini file.
start_init = ConfigIni('c:/temp/nipyapi/monitor.ini')
#groups are returned as list of groups
groups= start_init.get_group_id()
url = start_init.get_url()
n = NifiInstance(url, 'myuser','mypassword')
#test list_all_processor
test=canvas.list_all_processors('5b641351-34d0-3def-a376-7824fbe9cc0f')
for item in test:
print (str(item .id))
my nipyapi version is
C:\Temp\nipyapi (nipyapi) λ pip freeze | grep nipyapi nipyapi==0.11.0 C:\Temp\nipyapi
Created 12-15-2018 08:40 PM
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output.
I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly.
Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete.
def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
pg_1 = fix_pg.generate(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
)
pg_2 = fix_pg.generate(parent_pg=pg_1)
root_proc_1 = fix_proc.generate()
pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
r1 = canvas.list_all_processors('root')
assert len(r1) == 7
r2 = canvas.list_all_processors(pg_1.id)
> assert len(r2) == 6
E AssertionError: assert 4 == 6
def list_all_processors(pg_id='root'):
"""
Returns a flat list of all Processors under the provided Process Group
Args:
pg_id (str): The UUID of the Process Group to start from, defaults to
the Canvas root
Returns:
list[ProcessorEntity]
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"
if nipyapi.utils.check_version('1.2.0') == -1:
targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
id=pg_id,
include_descendant_groups=True
)
return targets.processors
else:
out = []
# list of child process groups
pg_ids = [x.id for x in list_all_process_groups(pg_id)]
# if not root, include the parent pg in the target list
# root is a special case that is included if targeted by
# list_all_process_groups
if pg_id == 'root' or pg_id == get_root_pg_id():
pass
else:
pg_ids.append(pg_id)
# process target list
for this_pg_id in pg_ids:
procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
if procs.processors:
out += procs.processors
return out
Created 12-18-2018 07:32 AM
Thats great news thank you Dan.