Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

​NiFi / NiFi-Registry - Best approach for "Change version" of a processgroup used by many other processgroups

Expert Contributor

Hi, I have a processgroup which acts as a subroutine for many other processgroups. I started version control and imported this version into the other processgroups. Now I made changes which lead to a new version which should be substituted in all other processgroups.

Found in "NiFi Summary" -> "PROCESS GROUPS" the possibility to find all affected processgroups.

The question is: Is there an other possibility for substitution except going from here to every single processgroup for "Change version"?

Using NiFi 1.6.0

Thanks for any help!

18 REPLIES 18

Explorer

It seems no such function yet. I currently move all my stuff in a 'root' group and version control it. For example, let's assume there are two environments - dev and prod. And I just import the same version-controlled 'root' group twice and named them 'dev' and 'prod'. When 'root' group version changed, all processes and groups inside - including version-controlled groups - changed too.108584-1.jpg

dev has 2 version-controlled groups inside and prod has 1 currently


108621-2.jpg


change prod version to 7 to sync

108612-3.jpg

Explorer

https://raw.githubusercontent.com/archongum/NiFi-templates/master/VersioningUpdateFlowVer.xml

Apply "change version" of all process groups which share the same version-control flow(bucket/flow)

Some input attributes:

  1. process group id, a parent group which contains all process groups which share the same version-control flow. Set to 'root' to loop all process groups but cost more time.
  2. bucket name
  3. flow name
  4. target version

This template based on https://github.com/Chaffelson/nipyapi

Expert Contributor

@Archon Gum

Hi, thanks for this awsome information! Something like this I was looking for but could not find...

Did you develope this solution?


Tested it on my local installation (1.9.1) and it worked perfect.

Now I have to prepare my offical DEV-Sytem with python and test it there (Cluster with 1.9.2) with more extensive flows.


I will let you know here whether the test was successfully.

Bye!

Explorer

Glad it works. I just wrote the nifi template and the python script inside it. Thanks to the `nipyapi` module.

Expert Contributor

@Archon Gum

Hi, I was told Python 2.7.5 is already available on my DEV. Installed nipyapi and changed CommandPath in ExecuteStreamCommand. On execution this error happens:

108659-1557900281889.png
Question: Is it absolutely necessary having Python3.x or can you see another cause for this exception?

Thanks for your help and this solution!

Expert Contributor

Hi @Archon Gum
do you mind if I ask you one more time for your help?

I compared the bucket.py in Python37 (local) and Python275 (DEV-System with Cluster) but there is no difference.


Maybe you could give me a hint to solve this problem? Thanks!

Explorer

Hi

I tried but I can't reproduce it:

# 1. set identifier to None
nipyapi.versioning.get_registry_bucket(identifier=None, identifier_type='name')
# return: TypeError: 'in <string>' requires string as left operand, not NoneType

# 2. empty all bucket in nifi-registry and try to get bucket
nipyapi.versioning.get_registry_bucket(identifier='test', identifier_type='name')
# return: None

I can't reach the code in quote:

1. If nifi-registry has buckets, each bucket's name will set to this 'name' parameter and can not be None at all.

2. If nifi-registry is empty(no bucket), the code won't be executed (because return a empty list)

109074-1559183972253.png


My suggestions:

1. check `nipyapi.config.registry_config.host = 'http://your-server:18080/nifi-registry-api'`

2. try to use a new nifi-registry and run the code again to find out whether the error occurs. (I recommand using docker: https://hub.docker.com/r/apache/nifi-registry )

3. it seems python version is not the problem here.

Expert Contributor

Hi @Archon Gum

thank you so much for your answer and your efforts!


Refering to your suggestions:

1. I checked the information and it is correct. BUT when I test to acces the REST-API of registry in an InvokeHTTP-Processor I have to type https.

So I changed this in "nipyapi.config.registry_config.host" and ended up with an certification failure. Here a part of it:


AttributeError: 'module' object has no attribute 'X509_up_ref'
2019-06-04 05:40:24,709 WARNING Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError(SSLError("bad handshake: Error([('SSL routines', 'ssl3_get_server_certificate', 'certificate verify failed')],)",),)': /nifi-registry-api/buckets
2019-06-04 05:40:24,709 WARNING Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError(SSLError("bad handshake: Error([('SSL routines', 'ssl3_get_server_certificate', 'certificate verify failed')],)",),)': /nifi-registry-api/buckets
WARNING:urllib3.connectionpool:Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'SSLError(SSLError("bad handshake: Error([('SSL routines', 'ssl3_get_server_certificate', 'certificate verify failed')],)",),)': /nifi-registry-api/buckets
From cffi callback <function _verify_callback at 0x7f3a008085f0>


So I have to speak with the admin.


2. Because I'm not the "master of the installation" I can't do this just like that. But I will bear this in mind.


3. I agree.


If I get this work I will let you know. So long!

Explorer

Oh, Is it your NiFi-Registry using HTTPS instead of HTTP? I'm going to try nipyapi on HTTPS too 🙂

Explorer

Hi @ Justen

Check this out.

import nipyapi
nipyapi.config.registry_config.host = 'https://localhost:18443/nifi-registry-api'

# additional code for SSL(HTTPS)
nipyapi.security.set_service_ssl_context(service='registry', 
                                         ca_file='G:/TEMP/ssl/localhost.cer', 
                                         client_cert_file='G:/TEMP/ssl/sys_admin.cer', 
                                         client_key_file='G:/TEMP/ssl/sys_admin.key', 
                                         client_key_password=None)


And it should works like this

109211-snipaste-2019-06-04-17-01-41.jpg


So, we should know somethings like:

1. HTTPS: HTTP + SSL(TLS is deprecated)

2. SSL: Know about Certificate. https://www.websecurity.symantec.com/security-topics/what-is-ssl-tls-https

3. asymmetric public-private key cryptosystem: common use case is `ssh-keygen`. https://en.wikipedia.org/wiki/RSA_(cryptosystem)

4. CA(certificate authority): Everyone can create a Certificate but whose Certificate is safe or 'legal'? So CA is here. https://www.google.com/search?q=ca+certificate

5. (optional) To setup a nifi-registry in HTTPS: https://www.youtube.com/watch?v=qD03ao3R-a4


To learn that, just google it.


In HTTPS-nifi-registry case, use this method to add certificate to access HTTPS. https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html#module-nipyapi.security

109221-snipaste-2019-06-04-17-26-09.jpg

I setup a nifi-registry followed by the youtube video above. Use tools - https://keystore-explorer.org/ - and the certificates should look like this

109231-snipaste-2019-06-04-16-57-59.jpg

Parameters:

1. client_cert_file: the certificate use to access nifi-registry, like 'username'

2. client_key_file: a private key for `client_cert_file`, 'password'

3. client_key_password: the password for private key - `client_key_file`, private key can be encrypted by a password or not be encrypted. In this case, the private key isn't encrypted so set to None.

4. ca_file: The CA who sign the `client_cert_file`. In this case, the CA file is created by ourselves too which is the 'localhost' certificate.

You can have all those files by asking your admin guy.


I hope I explain clearly 🙂

Expert Contributor

@Archon Gum

Thanks for this detailed and informative answer! So my admin solved the SSL-problem among others by installing Python3.6.

Now I come until this error:


ExecuteStreamCommand[id=e0bc9898-7dfb-3992-9e35-46fd7e983658] Transferring flow file StandardFlowFileRecord[uuid=f680a1c1-5ba9-426e-8b4f-a1d89e1a3f30,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1560791493717-1, container=default, section=1], offset=646526, length=-1],offset=0,name=tmp_nifi_update_flow_ver.py,size=0] to nonzero status. 
Executable command /usr/bin/python3.6 ended in an error: Traceback (most recent call last): 
File "tmp_nifi_update_flow_ver.py", line 56, in <module> update_flow_ver(pg_id, bucket_name, flow_name, target_version) 
File "tmp_nifi_update_flow_ver.py", line 42, in update_flow_ver flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=bucket.identifier, identifier=flow_name, identifier_type='name') 
AttributeError: 'NoneType' object has no attribute 'identifier'


Can you give me some advice on this?

Explorer
bucket = nipyapi.versioning.get_registry_bucket(identifier=bucket_name, identifier_type='name')
# from: I should add some verification here
if bucket is None:
  print(f"ERROR: bucket is None which means {bucket_name} not exists")
  sys.exit(-1)
# end
flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=bucket.identifier, identifier=flow_name, identifier_type='name')


Nothing fatal, just check the "bucket" whether it exists or not.


Expert Contributor

@Archon Gum
I'm sorry, but the bucket definitely exists!

Your solution came so close to my target... Do you have another idea?


By the way. If I add your supplement this error occurs:

ExecuteStreamCommand[id=e0bc9898-7dfb-3992-9e35-46fd7e983658] Transferring flow file StandardFlowFileRecord[uuid=6a70ebcc-13a0-431b-aa85-014bf8fb825f,claim=StandardContentClaim 
[resourceClaim=StandardResourceClaim[id=1560791493717-1, container=default, section=1], offset=715298, length=59],offset=0,name=tmp_nifi_update_flow_ver.py,size=59] to nonzero status. 
Executable command /usr/bin/python3.6 ended in an error: Traceback (most recent call last):
  File "tmp_nifi_update_flow_ver.py", line 61, in <module>
    update_flow_ver(pg_id, bucket_name, flow_name, target_version)
  File "tmp_nifi_update_flow_ver.py", line 45, in update_flow_ver
    sys.exit(-1)
NameError: name 'sys' is not defined

Explorer
import sys

import the 'sys' module

Expert Contributor

@Archon Gum

Embarrassing... OK, now this happens.
109442-1560934658063.png

Attributes of nonzero status FF:
109451-1560934745186.png

Expert Contributor

Hi @Archon Gum , I know I'm very persistent but I see no other way than aksing you for help again.

Ran the script from shell and received the message you added:

python3.6 tmp_nifi_update_flow_ver.py --pg_id f5163735-016b-1000-0000-00002a32b939 --bucket_name Justen_Bucket --flow_name TEST_IJ_PG_1 --target_version latest
ERROR: bucket is None which means Justen_Bucket not exists

Even when I use another bucket this message appears.

It seems the nipyapi.versioning.get_registry_bucket(identifier=bucket_name, identifier_type='name') doesn't return the real buckets in registry.

Is it possible to test this in another way? Could it be an authorization problem again?

Sorry, but I see no way to solve this problem myself.

It would be a pity if all your efforts were in vain... Thank you!

Explorer

@ Justen Do you use slack? Leave your email address and I invite you to my channel, we discuss there.

Expert Contributor

@Archon Gum I'm glad you answered!

Yes, I'm using slack - since about 5 minutes... ;-))

Here's my email address which I will remove in about 2 hours: removed

I'm looking forward to receive an appointment for our discussion because sometimes I'm not available, sorry!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.