Member since
02-08-2016
80
Posts
88
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3902 | 12-15-2018 08:40 PM | |
3272 | 03-29-2018 10:15 AM | |
1332 | 02-01-2018 08:08 AM | |
2094 | 01-24-2018 09:23 PM | |
1241 | 11-05-2017 03:24 PM |
12-15-2018
08:40 PM
2 Kudos
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output. I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly. Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete. def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
pg_1 = fix_pg.generate(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
)
pg_2 = fix_pg.generate(parent_pg=pg_1)
root_proc_1 = fix_proc.generate()
pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
r1 = canvas.list_all_processors('root')
assert len(r1) == 7
r2 = canvas.list_all_processors(pg_1.id)
> assert len(r2) == 6
E AssertionError: assert 4 == 6
def list_all_processors(pg_id='root'):
"""
Returns a flat list of all Processors under the provided Process Group
Args:
pg_id (str): The UUID of the Process Group to start from, defaults to
the Canvas root
Returns:
list[ProcessorEntity]
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"
if nipyapi.utils.check_version('1.2.0') == -1:
targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
id=pg_id,
include_descendant_groups=True
)
return targets.processors
else:
out = []
# list of child process groups
pg_ids = [x.id for x in list_all_process_groups(pg_id)]
# if not root, include the parent pg in the target list
# root is a special case that is included if targeted by
# list_all_process_groups
if pg_id == 'root' or pg_id == get_root_pg_id():
pass
else:
pg_ids.append(pg_id)
# process target list
for this_pg_id in pg_ids:
procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
if procs.processors:
out += procs.processors
return out
... View more
12-13-2018
08:43 PM
Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?
... View more
08-31-2018
09:57 PM
1 Kudo
There is a full Python client for NiFi, NiFi-Registry, and I'm just about to add the Schema-Registry as well. https://github.com/Chaffelson/nipyapi It is community supported and comes with all the basic calls you would need to automate your NiFi flows, higher level functions for complex tasks like 'stop all processors in this process group and purge all the queues', and demos of various usage methods like tokenAuth security, promoting flow versions between environments, and deploying NiFi in Docker containers for testing.
... View more
08-24-2018
07:38 PM
The error is in the log you posted: Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing it couldn't find the myid file stated in Pierre's instructions in order to proceed with cluster startup. You should also be aware that Zookeeper needs an odd number of nodes to form a quorum and operate correctly, so you should either only have ZK on the first node, or make a 3 node cluster.
... View more
03-29-2018
10:15 AM
1 Kudo
The short answer is that you cannot embed such an environment to work within NiFi, but you can use NiFi to make calls against it. NiFi embeds Jython, not full Python, so you cannot realistically get it to embed a full Anaconda environment. Typically what I do is use the ExecuteStreamCommand to call a small bash script which activates the python virtualenv and runs the script against the data, then returns to NiFi to carry on the flow. Give this a try and let me know how you go.
... View more
03-22-2018
09:01 AM
UPDATE: Note that this feature was introduced in NiFi-1.5.0 / HDF-3.1 and is now GA. https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_security/content/ch05s04.html
... View more
02-01-2018
08:08 AM
1 Kudo
Hi @joh snow The version of NiFi used under Kylo is a bit different from the Apache version, but I might be able to give some advice. Can you tell me which version of Kylo / NiFi you are using, and how many cluster nodes of what kind of hardware you are running on?
... View more
01-31-2018
11:41 PM
6 Kudos
In this article I will explain an easy way to automate some basic tasks in NiFi as an introduction to NiPyApi, an automation package for Apache NiFi and its sub-projects. Environment Setup Requirements You will need a Python environment
2.7 or 3.6 are tested, and most computers come with one of these or you can create a virtualenv or you can install Python on OSX using homebrew You will need a recent internet browser, given you're reading this I assume you have one - I'm using Chrome You will need NiFi services to test against - if you have Docker installed you can use the following commands to prepare NiFi & NiFi Registry services: user$: curl https://raw.githubusercontent.com/Chaffelson/nipyapi/master/test_env_config/docker_compose_latest/docker-compose.yml | docker-compose -f - up -d
user$: docker ps Docker will download and start the NiFi containers, and show you the details: You should be able to browse to both NiFi and NiFi-Registry at the following URLs: http://localhost:8080/nifi/ http://localhost:18080/nifi-registry/ Installing NiPyApi Installing NiPyApi is very easy, and done by the usual Python package distribution manager called Pip: user$: pip install nipyapi NiPyApi will install along with it's package dependencies, much like a linux package - don't worry about the dependencies, it'll look like this when it's done: Once that completes, go ahead and start an interactive python session on your command line and run a test command: user$: python
>>> from nipyapi import config, canvas
>>> config.nifi_config.host
'http://localhost:8080/nifi-api'
>>> canvas.get_root_pg_id()
'4e8d8f99-0161-1000-fa6f-724e5873aebc' NiPyApi will look for a NiFi environment on the usual port, or you can change this in nipyapi.config shown above. Congratulations! You have just commanded the NiFi API in less than 5 lines of code. Investigating the Package Now we can try using a few of the NiPyApi commands to interact with the NiFi environment - while the entire NiFi and NiFi-Registry APIs are implemented, only some of the calls are surfaced for common use - you can find out about them in great detail either through the online documentation at ReadTheDocs, or by investigating the Github Repo. For now, try looking at the console documentation of the nipyapi.canvas functions using the help() command: >>> help(canvas)
Help on module nipyapi.canvas in nipyapi:
NAME
nipyapi.canvas
FILE
/Users/dchaffey/.virtualenvs/tmp-167d86bd91b19b09/lib/python2.7/site-packages/nipyapi/canvas.py
DESCRIPTION
For interactions with the NiFi Canvas
STATUS: Work in Progress to determine pythonic datamodel
FUNCTIONS
create_process_group(parent_pg, new_pg_name, location)
Creates a new PG with a given name under the provided parent PG
:param parent_pg: ProcessGroupEntity object of the parent PG
:param new_pg_name: String to name the new PG
:param location: Tuple of (x,y) coordinates to place the new PG
:return: ProcessGroupEntity of the new PG
... You can see there are a lot of functions here that you can use to complete tasks against NiFi, and there are even more in the nipyapi.templates and nipyapi.versioning modules. Trying an Automation Script There is a handy interactive Demo built into NiPyApi, and this time we're also going to use the new NiFi-Registry as well. It will procedurally generate a Process Group containing a Processor in NiFi, and then put them under Version Control in the NiFi Registry. It will then also clone the version from one Registry Bucket to another, simulating code promotion: Note that if you did not use the supplied Docker configuration above, you may have to modify the script to connect to your NiFi and NiFi-Registry environments. >>> from nipyapi.demo.console import *
>>> dir()
['__builtins__', '__doc__', '__name__', '__package__', 'bucket_0', 'bucket_1', 'canvas', 'config', 'process_group_0', 'processor_0', 'reg_client_0', 'ver_flow_0', 'ver_flow_1', 'ver_flow_info_0', 'ver_flow_snapshot_0', 'ver_flow_snapshot_1']
You can see here a number of NiFi and Registry objects have been created for you by the automation script as described. You can take a look at the script and how it's using the NiPyApi functions on Github. If you head over to your NiFi and NiFi-Registry GUI, you can explore the objects and try the new features out for yourself. Happy Coding!
... View more
Labels:
01-24-2018
09:29 PM
This error often comes up when the file being uploaded is not a valid template. Open the file with a plain text editor and check that it looks something like this: <?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>4d5dcf9a-015e-1000-097e-e505ed0f7fd2</groupId>
<name>nipyapi_testTemplate_00</name>
... View more
01-24-2018
09:23 PM
2 Kudos
If you compile NiFi you can retrieve a swagger definition from: ./nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/target/swagger-ui/swagger.json You could then combine this with your preferred language codegen to produce a full Rest API client. You can also use the one I maintain in Python at https://github.com/Chaffelson/nipyapi I have already written wrapper functions to retrieve processor information.
... View more