Member since
02-08-2016
80
Posts
87
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2415 | 12-15-2018 08:40 PM | |
1843 | 03-29-2018 10:15 AM | |
677 | 02-01-2018 08:08 AM | |
1393 | 01-24-2018 09:23 PM | |
644 | 11-05-2017 03:24 PM |
12-15-2018
08:40 PM
2 Kudos
Hi @Simon Jespersen I believe I have reproduced your error with the following pytest code/output. I think it is interesting that it works when the parent is the root canvas, but not sub processor groups - probably my recursion code is poor, I will try to fix this quickly. Edit: Resolved, the updated function is below if you want to inspect it yourself, I also took a moment to leverage the new 'descendants' calls to speed it up on newer versions. I'll push this into the code base once full testing is complete. def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
pg_1 = fix_pg.generate(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
)
pg_2 = fix_pg.generate(parent_pg=pg_1)
root_proc_1 = fix_proc.generate()
pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
r1 = canvas.list_all_processors('root')
assert len(r1) == 7
r2 = canvas.list_all_processors(pg_1.id)
> assert len(r2) == 6
E AssertionError: assert 4 == 6
def list_all_processors(pg_id='root'):
"""
Returns a flat list of all Processors under the provided Process Group
Args:
pg_id (str): The UUID of the Process Group to start from, defaults to
the Canvas root
Returns:
list[ProcessorEntity]
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"
if nipyapi.utils.check_version('1.2.0') == -1:
targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
id=pg_id,
include_descendant_groups=True
)
return targets.processors
else:
out = []
# list of child process groups
pg_ids = [x.id for x in list_all_process_groups(pg_id)]
# if not root, include the parent pg in the target list
# root is a special case that is included if targeted by
# list_all_process_groups
if pg_id == 'root' or pg_id == get_root_pg_id():
pass
else:
pg_ids.append(pg_id)
# process target list
for this_pg_id in pg_ids:
procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
if procs.processors:
out += procs.processors
return out
... View more
12-13-2018
08:43 PM
Hi Simon - definitely sounds like a bug to me. Can you please share the NiPyApi version you are using and the script you are using?
... View more
08-31-2018
09:57 PM
1 Kudo
There is a full Python client for NiFi, NiFi-Registry, and I'm just about to add the Schema-Registry as well. https://github.com/Chaffelson/nipyapi It is community supported and comes with all the basic calls you would need to automate your NiFi flows, higher level functions for complex tasks like 'stop all processors in this process group and purge all the queues', and demos of various usage methods like tokenAuth security, promoting flow versions between environments, and deploying NiFi in Docker containers for testing.
... View more
08-24-2018
07:38 PM
The error is in the log you posted: Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing it couldn't find the myid file stated in Pierre's instructions in order to proceed with cluster startup. You should also be aware that Zookeeper needs an odd number of nodes to form a quorum and operate correctly, so you should either only have ZK on the first node, or make a 3 node cluster.
... View more
03-29-2018
10:15 AM
1 Kudo
The short answer is that you cannot embed such an environment to work within NiFi, but you can use NiFi to make calls against it. NiFi embeds Jython, not full Python, so you cannot realistically get it to embed a full Anaconda environment. Typically what I do is use the ExecuteStreamCommand to call a small bash script which activates the python virtualenv and runs the script against the data, then returns to NiFi to carry on the flow. Give this a try and let me know how you go.
... View more
03-22-2018
09:01 AM
UPDATE: Note that this feature was introduced in NiFi-1.5.0 / HDF-3.1 and is now GA. https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_security/content/ch05s04.html
... View more
02-01-2018
08:08 AM
1 Kudo
Hi @joh snow The version of NiFi used under Kylo is a bit different from the Apache version, but I might be able to give some advice. Can you tell me which version of Kylo / NiFi you are using, and how many cluster nodes of what kind of hardware you are running on?
... View more
01-31-2018
11:41 PM
6 Kudos
In this article I will explain an easy way to automate some basic tasks in NiFi as an introduction to NiPyApi, an automation package for Apache NiFi and its sub-projects. Environment Setup Requirements You will need a Python environment
2.7 or 3.6 are tested, and most computers come with one of these or you can create a virtualenv or you can install Python on OSX using homebrew You will need a recent internet browser, given you're reading this I assume you have one - I'm using Chrome You will need NiFi services to test against - if you have Docker installed you can use the following commands to prepare NiFi & NiFi Registry services: user$: curl https://raw.githubusercontent.com/Chaffelson/nipyapi/master/test_env_config/docker_compose_latest/docker-compose.yml | docker-compose -f - up -d
user$: docker ps Docker will download and start the NiFi containers, and show you the details: You should be able to browse to both NiFi and NiFi-Registry at the following URLs: http://localhost:8080/nifi/ http://localhost:18080/nifi-registry/ Installing NiPyApi Installing NiPyApi is very easy, and done by the usual Python package distribution manager called Pip: user$: pip install nipyapi NiPyApi will install along with it's package dependencies, much like a linux package - don't worry about the dependencies, it'll look like this when it's done: Once that completes, go ahead and start an interactive python session on your command line and run a test command: user$: python
>>> from nipyapi import config, canvas
>>> config.nifi_config.host
'http://localhost:8080/nifi-api'
>>> canvas.get_root_pg_id()
'4e8d8f99-0161-1000-fa6f-724e5873aebc' NiPyApi will look for a NiFi environment on the usual port, or you can change this in nipyapi.config shown above. Congratulations! You have just commanded the NiFi API in less than 5 lines of code. Investigating the Package Now we can try using a few of the NiPyApi commands to interact with the NiFi environment - while the entire NiFi and NiFi-Registry APIs are implemented, only some of the calls are surfaced for common use - you can find out about them in great detail either through the online documentation at ReadTheDocs, or by investigating the Github Repo. For now, try looking at the console documentation of the nipyapi.canvas functions using the help() command: >>> help(canvas)
Help on module nipyapi.canvas in nipyapi:
NAME
nipyapi.canvas
FILE
/Users/dchaffey/.virtualenvs/tmp-167d86bd91b19b09/lib/python2.7/site-packages/nipyapi/canvas.py
DESCRIPTION
For interactions with the NiFi Canvas
STATUS: Work in Progress to determine pythonic datamodel
FUNCTIONS
create_process_group(parent_pg, new_pg_name, location)
Creates a new PG with a given name under the provided parent PG
:param parent_pg: ProcessGroupEntity object of the parent PG
:param new_pg_name: String to name the new PG
:param location: Tuple of (x,y) coordinates to place the new PG
:return: ProcessGroupEntity of the new PG
... You can see there are a lot of functions here that you can use to complete tasks against NiFi, and there are even more in the nipyapi.templates and nipyapi.versioning modules. Trying an Automation Script There is a handy interactive Demo built into NiPyApi, and this time we're also going to use the new NiFi-Registry as well. It will procedurally generate a Process Group containing a Processor in NiFi, and then put them under Version Control in the NiFi Registry. It will then also clone the version from one Registry Bucket to another, simulating code promotion: Note that if you did not use the supplied Docker configuration above, you may have to modify the script to connect to your NiFi and NiFi-Registry environments. >>> from nipyapi.demo.console import *
>>> dir()
['__builtins__', '__doc__', '__name__', '__package__', 'bucket_0', 'bucket_1', 'canvas', 'config', 'process_group_0', 'processor_0', 'reg_client_0', 'ver_flow_0', 'ver_flow_1', 'ver_flow_info_0', 'ver_flow_snapshot_0', 'ver_flow_snapshot_1']
You can see here a number of NiFi and Registry objects have been created for you by the automation script as described. You can take a look at the script and how it's using the NiPyApi functions on Github. If you head over to your NiFi and NiFi-Registry GUI, you can explore the objects and try the new features out for yourself. Happy Coding!
... View more
- Find more articles tagged with:
- automation
- Data Ingestion & Streaming
- FAQ
- How-ToTutorial
- nifi-api
- nifi-registry
- python
- sdlc
Labels:
01-31-2018
10:04 PM
Repo Description This package provides pythonic calls for common NiFi tasks and CICD/SDLC integrations You might call it Flow Development LifeCycle. These are implemented by replicating the action of the same task in the GUI and surfacing the underlying NiFi Data structures and calls wherever possible, to retain UX parallelism for the user. Docs available on ReadTheDocs
Functionality Highlights:
Full native Python rest client for NiFi and NiFi-Registry CRUD wrappers for common task areas like Processor Groups, Processors, Templates, Registry Clients, Registry Buckets, Registry Flows, etc. Convenience functions for inventory tasks, such as recursively retrieving the entire canvas, or a flat list of all Process Groups Docker Compose configurations for testing and deployment Limited support for scheduling components A scripted deployment of an interactive environment for testing and demonstration purposes Coming soon:
Secured environment support is not currently implemented, but it is planned to be done very soon Support for complex scheduling requests, such as stopping a large flow and waiting for all Processors to be halted Support for edge cases during Versioning changes, such as Reverting a flow containing live data Repo Info Github Repo URL https://github.com/Chaffelson/nipyapi Github account name Chaffelson Repo name nipyapi
... View more
- Find more articles tagged with:
- api
- Data Ingestion & Streaming
- nifi-api
- nifi-registry
- python
- sdlc
- utilities
Labels:
01-24-2018
09:29 PM
This error often comes up when the file being uploaded is not a valid template. Open the file with a plain text editor and check that it looks something like this: <?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>4d5dcf9a-015e-1000-097e-e505ed0f7fd2</groupId>
<name>nipyapi_testTemplate_00</name>
... View more
01-24-2018
09:23 PM
2 Kudos
If you compile NiFi you can retrieve a swagger definition from: ./nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/target/swagger-ui/swagger.json You could then combine this with your preferred language codegen to produce a full Rest API client. You can also use the one I maintain in Python at https://github.com/Chaffelson/nipyapi I have already written wrapper functions to retrieve processor information.
... View more
11-20-2017
08:04 PM
Hi @Phaneendra S that error message most usually means you have incorrectly configured your Producer and it is not actually managing to talk with the Kafka service. You should check both your application logs and the kafka service logs to see what the error is. You could try connecting to the Kafka using your Parameters in NiFi, if it works you will know that your Producer code is at fault.
... View more
11-05-2017
03:24 PM
Have you looked at the Max Threads settings under the main NiFi Controller Settings menu? It doesn't allow you to strictly limit the utilisation of the flow, but it provides a limitation on the max concurrency at a global level.
... View more
10-23-2017
12:05 PM
How have you set up your shell script to handle closing on job timeout?
I'm not aware of ExecuteStreamCommand having a hidden timeout property, though you could add something like a 'max wait' timeout if you wanted to do a PR.
... View more
10-16-2017
01:22 PM
Hi @Pratik Ghatak This looks like what happens when you try to connect NiFi to an old fork of Hive like the one in CDH, I have an article about it here: https://community.hortonworks.com/articles/93771/connecting-nifi-to-cdh-hive.html There's an excellent comment from @Rudolf Schimmel on a workaround at the bottom of that post if you are in the same situation, but give it a read and let me know if you need help with an alternative solution.
... View more
09-08-2017
09:57 AM
Hi @smanjee - did you set up any automation to push your nar files into the nifi docker container on build?
... View more
08-08-2017
08:52 PM
Can you check that it's not receiving empty records? I saw an error that looked a lot like this in the TP code when it attempted to decode messages I had mistakenly created without content.
... View more
08-08-2017
11:38 AM
Actually, according to this stackoverflow answer on a related issue, you want to do the following: $.latestFeedJobExecutionContext.['demo.msexchange'].feedts
... View more
08-03-2017
04:14 PM
1 Kudo
You are right! I have passed this to the documentation team to be fixed up, thanks for point it out! It should be redirected to: http://superset.incubator.apache.org/gallery.html
... View more
08-03-2017
04:11 PM
2 Kudos
If it is a flat structure you probably want : $.* Or, if it is some part of a nested structure: $.firstpart.secondpart.*
... View more
08-03-2017
04:07 PM
2 Kudos
One of your configuration parameters for Superset is missing, it looks like it's one of the port statements. Check that you've set the database port for MySQL and the HTTP port for Superset.
... View more
08-03-2017
04:04 PM
Have you tried $.latestFeedJobExecutionContext.demo\.msexchange.feedts ?
... View more
08-01-2017
02:25 PM
I'm pretty sure colons in filenames are considered bad practice - try removing that. The dot could be because it's failing to complete the writing of the file, so it's generating the dot file for writing but can't complete the action. Just a guess without looking at your logs.
... View more
06-19-2017
08:10 PM
Have you considered installing Java MiNiFi on the windows server and using it to execute the necessary local flows to collect the data, then push it to NiFi via a SiteToSite connection? Opening one port for MiNiFi to S2S back to NiFi would be a lot easier than maintaining 300 sharemounts via Samba.
... View more
06-19-2017
07:59 AM
@Timothy Spann Interesting. In all the tests I did I never personally tried two different versions of MySQL on the same box, I always used Postgres for Ambari/etc. and MySQL57 for SAM/etc.
It makes sense though - you can't yum install two different versions of the same program as they'd overwrite each other - you'd have to compile it from source or put one of them in a docker or something.
... View more
06-19-2017
07:44 AM
5 Kudos
@yjiang Have you installed NiFi on the same server as Superset? They have a default port clash on 9088 - I suggest you search in the NiFi configs for 9088 and change it to 9089 (9089 is free in a default install).
... View more
06-19-2017
07:42 AM
@yjiang Glad to hear it's working - there should be a couple of bash scripts in /usr/hdf/current/streamline/bootstrap that are better to use than the sql directly - as it also creates the UDFs and roles and whatnot.
I'd suggest you run ./bootstrap-storage.sh drop-create and then ./bootstrap.sh to ensure you have a clean install.
... View more
06-16-2017
04:50 PM
5 Kudos
@yjiang @Andres Koitmäe @Jerry Johnson This message is usually because you are using an unsupported database version, most commonly because you installed SAM against the Ambari Database - unfortunately at this time they do not support cross-compatible versions.
For your reference, the support matrices are here: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_support-matrices/content/ch_matrices-hdf.html
... View more
05-31-2017
10:20 AM
@James Dong @Hans Pointner
This stylesheet is incomplete for arbitrary XML conversion, I suggest you try https://github.com/bramstein/xsltjson
I've documented use of it with the TransformXML processor in a new article here: https://community.hortonworks.com/content/kbentry/105547/nifi-xml-to-json-shredding-a-generalised-solution-3.html
... View more
05-31-2017
09:52 AM
9 Kudos
I'm going to cover a simple OSS solution to Shredding XML in NiFi, and demonstrate how you can chain simple steps together to achieve common data shredding tasks. Feel free to get in touch if you need to achieve something more complex than these basic steps will allow. We will be covering:
Procedurally converting Xml to Json using a fast XSLT 2.0 template Constructing Jolt transforms to extract nested subsections of JSON documents Constructing JsonPath expressions to split multi-record JSON documents Procedurally flattening complex nested JSON for easy querying This process is shown on NiFi-1.2.0, and tested on a variety of XML documents, but most notably a broad collection of GuideWire sample XMLs as part of a Client PoC. The XML examples below have retained the nested structure but anonymised the fields. XML to JSON Here we combine the NiFI TransformXML processor with the excellent BSD-licensed xsltjson procedural converter found at https://github.com/bramstein/xsltjson. Simply check out the repo and set the XSLT filename in the processor to xsltjson/conf/xml-to-json.xsl.
There are several conversion options present, I suggest the Badgerfish notation if you want an easier time of validating your conversion accuracy, but the default conversion is suitably compact from uncomplicated XMLs.
So your Input XML might look something like this: <BrokerResponse>
<aggsId>3423897f9w8v89yb99873r</aggsId>
<quote>
<brandID>AB</brandID>
<brandDescription>Corp</brandDescription>
<quoteReference>0023400010050105</quoteReference>
<annualPremium>271.45</annualPremium>
<totalPremiumPNCD>304.56</totalPremiumPNCD>
<pncdIndicator>true</pncdIndicator>
<productType>Private Car</productType>
<insurerName>SomeRandom Insurance Company Limited</insurerName>
<coverType>comprehensive</coverType>
<instalments>
<instalmentScheme>12 at 13.9% (qr:27)</instalmentScheme>
<instalmentType>Monthly</instalmentType>
<downPayment>29.18</downPayment>
<downPaymentPercentage>8.3385725</downPaymentPercentage>
<totalInstalmentPremium>349.94</totalInstalmentPremium>
<paymentAmount>29.16</paymentAmount>
<noOfPayments>11</noOfPayments>
<interestAmount>45.38</interestAmount>
<apr>42.8</apr>
</instalments>
<vehicle>
<excess>
<name>PCAccidentalDamageCov_Ext</name>
<amount>95.0</amount>
</excess>
... etc. And your output would look something like this (these strings aren't identical due to my data anonymization): {
"BrokerResponse" : {
"aggsId" : "4598e79g8798f298f",
"quote" : [ {
"brandID" : "AB",
"brandDescription" : "Corp",
"quoteReference" : "0000120404010",
"annualPremium" : 271.45,
"totalPremiumPNCD" : 304.56,
"pncdIndicator" : true,
"productType" : "Private Car",
"insurerName" : "SomeRandom Insurance Company Limited",
"coverType" : "comprehensive",
"instalments" : {
"instalmentScheme" : "12 at 12.3% (qr:33)",
"instalmentType" : "Monthly",
"downPayment" : 29.18,
"downPaymentPercentage" : 8.3385725,
"totalInstalmentPremium" : 349.94,
"paymentAmount" : 29.16,
"noOfPayments" : 11,
"interestAmount" : 45.38,
"apr" : 29.9
}, {
"brandID" : "BC",
"brandDescription" : "Acme Essential",
"quoteReference" : "NA",
"isDeclined" : true,
"quoteErrors" : {
"errorCode" : "QUOTE_DECLINED",
"errorDescription" : "Quote Declined"
}
}
}
]
}
} Using Jolt to extract sections Coming to both XSLT and Jolt as a new user, I found Jolt far easier to learn and use - Relying on the every popular StackExchange, Jolt answers tended to teach you to fish, whereas XSLT answers were usually selling you a fish. Handily, NiFi has a built in editor if you use the Advanced button on the JoltTransformJSON processor, this mimics the behaviour on the popular http://jolt-demo.appspot.com/ site for building your transforms.
A key thing to note is setting the Jolt DSL to 'Chain' in the NiFi processor, and then using your various 'spec' settings within the Transforms specified. This will align the NiFi processor behaviour with the Jolt-demo. Building a Jolt spec is about defining steps from the root of the document, and there are excellent guides elsewhere on the internet, but here is a simple but useful example.
Given the previous example of Xml converted to Json, this Jolt transform would check each quote subsection of the BrokerResponse, and if it contains an instalments section, return it in an array called quoteOffers, and drop any quotes that don't contain an Instalments section, such as the declined offers: [
{
"operation": "shift",
"spec": {
"BrokerResponse": {
"quote": {
"*": {
"instalments": {
"@1": "quoteOffers[]"
}
}
}
}
}
}
] This next Jolt transform would select just the Instalments section from the previous output of quoteOffers, and drop the rest of the details: [
{
"operation": "shift",
"spec": {
"quoteOffers": {
"*": {
"instalments": {
"@0": "instalments[]"
}
}
}
}
}
] Much simpler than XSLT! Using JsonPath to split documents This is a very simple process, again with good examples available out on the wider internet.
Using the above example again, if we received multiple quoteResponses in a single document we'd then have multiple instalment responses, and we might want to split them out into one quote per document, this would be as simple as using the following: $.instalments.* This specifies the root of the document using $, the instalments array, and then emitting each child item as a separate Flowfile. Flattening Json Something else you might want to do is Flatten your complex nested structures into simple iterables without having to specify a schema. This can be really useful if you just want to load the shredded XML for further analysis in Python without having traverse the structure to get at the bits you're interested in. I came an the excellent Apache licensed java lib at https://github.com/wnameless/json-flattener, which I have wrapped into a NiFi-1.2-0 compatible processor at https://github.com/Chaffelson/nifi-flatjson-bundle. There are many more options within the lib that I have not taken the time to expose yet, including making it reversible! Again using our example XML document from above, the flattened output might look a bit like this: {
"quoteOffers[0].brandID" : "AB",
"quoteOffers[0].brandDescription" : "Corp",
"quoteOffers[0].quoteReference" : "004050025001001",
"quoteOffers[0].annualPremium" : 271.45,
"quoteOffers[0].totalPremiumPNCD" : 304.56,
"quoteOffers[0].pncdIndicator" : true,
"quoteOffers[0].productType" : "Private Car",
"quoteOffers[0].insurerName" : "SomeRandom Insurance Company Limited",
"quoteOffers[0].coverType" : "comprehensive",
"quoteOffers[0].instalments.instalmentScheme" : "12 at 13.9% (qr:2)2",
"quoteOffers[0].instalments.instalmentType" : "Monthly",
"quoteOffers[0].instalments.downPayment" : 29.18,
"quoteOffers[0].instalments.downPaymentPercentage" : 8.3385725,
"quoteOffers[0].instalments.totalInstalmentPremium" : 349.94,
"quoteOffers[0].instalments.paymentAmount" : 29.16,
"quoteOffers[0].instalments.noOfPayments" : 11,
"quoteOffers[0].instalments.interestAmount" : 45.38,
"quoteOffers[0].instalments.apr" : 23.9,
"quoteOffers[0].vehicle.excess[0].name" : "PCAccidentalDamageCov_Ext",
"quoteOffers[0].vehicle.excess[0].amount" : 95.0,
"quoteOffers[0].vehicle.excess[1].name" : "PCLossFireTheftCov_Ext",
"quoteOffers[0].vehicle.excess[1].amount" : 95.0,
"quoteOffers[0].vehicle.excess[2].name" : "PCTheftKeysTransmitterCov_Ext",
"quoteOffers[0].vehicle.excess[2].amount" : 95.0,
"quoteOffers[0].vehicle.excess[3].name" : "PCGlassDmgWrepairdmgCT_Ext",
"quoteOffers[0].vehicle.excess[3].amount" : 25.0,
"quoteOffers[0].vehicle.excess[4].name" : "PCGlassDmgWreplacementdmgCT_Ext",
"quoteOffers[0].vehicle.excess[4].amount" : 85.0,
"quoteOffers[0].vehicle.excess[5].name" : "Voluntary Excess",
"quoteOffers[0].vehicle.excess[5].amount" : 100.0,
... etc. Conclusion So there you have it, with only 3 lines of code we've converted arbitrary nested XML into JSON, filtered out bits of the document we don't want (declined quotes), extracted the section of the quotes we want to process (quoteOffers), split each quote into a single document (Instalments), and then flattened the rest of the quoteResponse into a flat JSON document for further analysis. Feel free to contact me if you have a shredding challenge we might be able to help you with.
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- guidewire
- How-ToTutorial
- json
- NiFi
- transform
- xml
Labels: