Member since
07-30-2019
2906
Posts
1442
Kudos Received
844
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
54 | 04-17-2024 11:30 AM | |
62 | 04-16-2024 05:36 AM | |
36 | 04-15-2024 05:31 AM | |
120 | 04-03-2024 05:59 AM | |
132 | 04-02-2024 01:22 PM |
10-10-2023
12:06 PM
@GauthierCLM You should raise a support case with Cloudera through https://my.cloudera.com/support.html. You'll need to login with your Cloudera username and password. You could also contact your Cloudera Account manager who can assist you with your licensing needs. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more
10-06-2023
09:01 AM
@PriyankaMondal 1. Not clear on the question here. Why use Toolkit to create three keystores? I thought you were getting three certificated (one for each node) from your IT team. Use those to create the three unique keystores you will use. 2. It appears your DN has a wildcard in it. NiFi does not support the use of wildcards in the DN of node ClientAuth certificates. This is because NiFi utilizes mutualTLS connections and the clientAuth DN is used to identify the unique connecting clients and is used to setup and configure the authorizations. Now you could ask your IT team to create you one keystore with a non wildcard DN like "cn=nifi-cluster, ou=domainlabs, DC=com" and add all three of your Nifi node's hostnames as SAN entries in that one PrivateKeyEntry. This would allow you to use that same PrivateKey keystore on all three NiFi nodes. This has downsides liek security. If keystore on one node gets compromised, all hosts are compromised because it is reused. All nodes will present as same client identity (since all present same DN) during authorization. So nothing will distinguish one node from the other. The keystore used by NiFi can ONLY contain one privateKey entry. Merging multiple keystores with privateKey entries will result in one keystore with more than one PrivateKeyEntry which is not supported by NiFi. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
10-05-2023
08:02 AM
@LKB Can you share screenshots of your UpdateAttribute processor configuration? Are you using the advanced UI of the UpdateAttribute processor? The UpdateAttribute processor is fairly simplistic in design. Without configuring the advanced UI, it simply can remove attributes or create/modify existing attributes. Each Attribute is defined by key:value pairs where the property name is the key and property value is the key. The Advanced UI allows for conditionally based attribute additions or modifications. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
10-05-2023
07:49 AM
@hkh After upgrading your DB, did you upgrade the driver being used in the DBCPConnectionPool controller service to the latest? This may be reason why the controller service is still trying to enable. You may also want to look at a NiFi thread dump to see what that enabling Controller service is waitin on If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
10-04-2023
05:46 AM
@hegdemahendra The autoload capability in NiFi can only auto-load new nars added to the directory. It does not handle unload or reload. The reason for this is because a reload would require the upgrade of existing components using a previously uploaded nar. This process would require the stopping of all components added to canvas from that nar, upgrading all those components to the new nar version, and then starting the components again. You also have the issue with the fact that the flow.json.gz has already been loaded in to memory with a different component version. Then you also have the issue of when someone adds a new nar version and does not remove the old nar version first. You should be able to click on a component on the canvas once multiple version of same class are loaded and switch to the other version. With the way NiFi is designed, NiFi will allow multiple versions of the same components to be loaded (always been that way). So there has never been the capability when multiple versions of the same component are loaded to trigger an upgrade of any components from those duplicate component classes on the canvas. NiFi can only change a component's version on startup and only if only one version of the component class exists on startup. On startup, NiFi loads the NiFi lib nars and any nars found in custom lib folders or autoload directory. These nars get unpacked in to a work directory. NiFi then starts loading the dataflow from the flow.json.gz file. The flow.json.gz contains each components class, version, and configuration. When loading a component where version is not found but ONLY one different version of that same component class is found, NiFi will switch that component that version of the class (could be older version or newer). If any component versions changed on startup a new flow.json.gz is written out to match what is loaded in to memory. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-29-2023
06:04 AM
@VLban From what you have shared, I don't think you are having any issues with yoru NiFi communicating with your zookeeper. When NiFi is running it sends a heartbeat message to ZK so that ZK knows that node is available. ZK is used to facilitate the election of two NiFi roles: 1. Cluster coordinator - Only one node in the NiFi cluster can be elected as cluster coordinator. The cluster coordinator is responsible for replicating requests made form any node to all nodes in the cluster. This allows for NiFi to support a zero master architecture meaning that users do not need to connect to the elected cluster coordinator node in order to make changes. Users can interact with the NiFi cluster form any node. 2. Primary node - Only one node at a time can be elected to this role. The node with this assigned role will be the only node that schedules component processors configured with "primary node" only execution. Your log output shared indicates that ZK is receiving these heartbeats from at least some of the 10 nodes (maybe all of them, but we know the node from which you got this log is talking to ZK fine) allowing for cluster coordinator election to be successful. We see that "sd-sagn-rtyev:9082" was elected with the cluster coordinator role. Once nodes aware of who the elected cluster coordinator is, they will start sending cluster heartbeats to that elected cluster coordinator. The initial set of heartbeats will be used to connect the nodes to the cluster (things like making sure all nodes are running the exact sam flow.xml.gz/flow.json.gz, have matching users.xml files, and authorizations.xml files). If your NiFi is secured (running over HTTPS), then all communications between nodes are over mutualTLS encrypted connections. Based on the exception you shared, it sounds like this connection between node(s) and the elected cluster coordinator is failing. 1. Make sure that all nodes can properly resolve the cluster hostnames to reachable IP addresses. 2. Make sure that the PrivateKeyEntry in each nodes keystore configured in the nifi.properties supports EKUs clientAuth and serverAuth, have required host SAN entry(s). 3. Make sure that the truststore used on every node contains the complete trust chain for all the privateKey entries being used by all 10 nodes. A PrivateKey may be signed by a root or intermediate CA (an intermediate CA may be signed by another intermediate CA or the root CA). A complete trust chain consists of ALL trusted public certificates from signer of the Private key to the root CA. If a MutualTLS handshake can not be established, typically one side or the other will simply close the connection. Most commonly as a result of lack of proper trust. Thus would explain the Broken pipe (write failed) as the client was unable to send heartbeat connection_request to the elected cluster coordinator. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-29-2023
12:38 AM
@MattWho : thanks for your answer: it was the last point: someone has made changes in the user auth configuration and now my user lost the right for viewing data provenance
... View more
09-28-2023
12:54 PM
@sarithe You may also want to take a look at Process Group (PG) FlowFile Concurrency configuration options as a possible design path since there does not appear to be any dependency between task 1 and task 2 in your description. You just want to make sure that not more than 2 tasks are executing concurrently. You move your processors that handle the 2 task executions inside two different child PGs configured with "Single FlowFile per Node" Process Group FlowFile Concurrency. Within the PG you create an input port and output port. Between these two ports you handle your task dataflow. Outside this PG (parent PG level), you handle the triggering FlowFiles. The task PGs will allow 1 FlowFile at a time to enter that PG and because of the FlowFile Concurrency setting, not allow any more FlowFiles to enter this PG until that FlowFile processes out. As you can see from above example, each task PG is only processing a single FlowFile at a time. I built this example so that task 2 always takes longer, so you see that task 1 Pg is outputting more FlowFile processed the Task 2 PG while still making sure that on two tasks are ever being executed concurrently. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-27-2023
07:57 PM
Got it. Its on Data Provenance dialog box.
... View more