Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 148 | 12-17-2025 05:55 AM | |
| 209 | 12-15-2025 01:29 PM | |
| 143 | 12-15-2025 06:50 AM | |
| 261 | 12-05-2025 08:25 AM | |
| 423 | 12-03-2025 10:21 AM |
11-14-2019
01:52 PM
1 Kudo
@littlesea374 The WAIT processor requires a release signal that is typically created using the NOTIFY processor. So that really will not help here. Perhaps you could try setting a penalty on each FlowFile. Penalized FlowFiles are not processed by the follow on processor until the penalty duration has ended. This can be done using the ExecuteScript processor after listSFTP: You then set length of "Penalty Duration" via the settings tab. Set penalty high enough to ensure the file writes have completed. Of course this does introduce some latency. What this will not help with is listSFTP still listing the same files multiple times. As data is written, the timestamp written on that source FlowFile updates, which means it will get listed again as if it is a new file. but the delay here allows full data to be written and then perhaps you can use a detectDuplicate processor to remove duplicates based on filename before you actually fetch the content. just some thoughts here, but that JIra is probably best path.... Matt
... View more
11-14-2019
10:11 AM
@littlesea374 The ListSFTP processor's only mechanism for ignoring files in the listing directory are those marked as hidden (start with . on linux based systems). The dot rename of file transfer is pretty common with SFTP. Now if the files are being streamed into the SFTP server by another processes that does not use some form of dot rename or filename change, you would need the new feature added to listSFTP as part of https://issues.apache.org/jira/browse/NIFI-5977 This new feature is part of Apache NiFi 1.10 which adds a couple new configuration properties to the listSFTP processor. Minimum File Age is what you would need to use. Only files where the last update time is older than this configured value would be listed. Hope this helps, Matt
... View more
11-12-2019
12:13 PM
1 Kudo
@stevenmatison NiFi that comes with HDF is never exactly the same as the Apache NiFi version. It is simply based off an Apache NiFi release with some modifications and in many cases additional fixes not part of the Apache release. There is currently no plan for an HDF release based off Apache NiFi 1.10.0. That may change. The current to plan is for Apache NiFi 1.10 to show up in the Cloudera Data Platform - Data Cloud (CDP-DC) release. Thank you, Matt
... View more
11-12-2019
10:45 AM
1 Kudo
@Cl0ck The basics: 1. NiFi must be secured before any form of Authentication and authorization can be utilized 2. You can not have a mix of secured and unsecured nodes in a single NiFi cluster. Unsecured NiFi nodes would not be able to communicate with the secured NiFi nodes. 3. Securing NiFi requires that you provide each NiFi node with a keystore file that contains a single PrivateKeyEntry that includes at a minimum the following: --- ExtendedKeyUsage supporting both clientAuth and serverAuth --- SubjectAlternativeName(s) that match the exact hostname for the NiFi node that keystore is being installed on. --- The keystore and key passwords need to be the same. 4. Securing NiFi also requires a truststore file which includes all the trust authorities. These TrustedCertEntries can be the public certs of the root and intermediate Certificate Authorities (CAs) or the public cert for any self signed certs you may have created. Note: The NiFi CA makes it easy to setup a CA and sign certificates for your nodes; however, it is not a full featured CA and is not recommended for production use. Securing NiFi requires the setting of the following properties in the nifi.properties file: nifi.security.keyPasswd=
nifi.security.keystore=/<path to>/keystore.jks
nifi.security.keystorePasswd=
nifi.security.keystoreType=JKS
nifi.security.truststore=/<path to>/truststore.jks
nifi.security.truststorePasswd=
nifi.security.truststoreType=JKS
nifi.web.https.host=<hostname>
nifi.web.https.network.interface.default=
nifi.web.https.port=<secure port>
nifi.cluster.protocol.is.secure=True
nifi.security.user.authorizer=
(optional) nifi.security.user.login.identity.provider= Once NiFi is configured to be secure, you need to consider how your users with authenticate: By default once secured, NiFi will require that all users authenticate via client/user TLS certificates. However, NiFi offers a variety of additional authentication methods that can be configured as additions to the TLS authentication. 1. TLS authentication (always attempted first) 2. Spnego (configured in nifi.properties and attempted second if configured) 3. Login provider (configured in the login-identity-providers.xml. Option include LDAP, kerberos, knox, and OpenID connect. Can only configure one and this is attempted third if neither one or two resulted in a client authenctication) You mentioned that you want to use LDAP, so you would need to configure the ldap-provider in the login-identity-providers.xml and set the property "nifi.security.user.login.identity.provider=ldap-provider" in the nifi.properties file. After authentication comes authorization... This is what the now authenticated user is allowed to do/access within your NiFi. Authorization configuration is done via the authorizers.xml file. It is easiest to read this file from the bottom up. at the bottom you should have an "authorizer": <authorizer>
<identifier>managed-authorizer</identifier>
<class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
<property name="Access Policy Provider">file-access-policy-provider</property>
</authorizer> Above is for the managed-authorizer which you would then reference in the nifi.properties file using the property "nifi.security.user.authorizer=managed-authorizer" You can see this authorizer calls a "file-access-policy-provider" which you must find above this entry in the authorizers.xml: <accessPolicyProvider>
<identifier>file-access-policy-provider</identifier>
<class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
<property name="User Group Provider">composite-user-group-provider</property>
<property name="Node Group"></property>
<property name="Initial Admin Identity"><username of ldap user who will act as the initial admin user></property>
<property name="Authorizations File">/<path to>/authorizations.xml</property>
<property name="Node Identity 2"><dn of node 1 in cluster></property>
<property name="Node Identity 3"><dn of node 2 in cluster></property>
<property name="Node Identity 4"><dn of node N in cluster></property>
</accessPolicyProvider> The above is responsible for setting up some initial necessary authorizations for your initial admin user and NiFi nodes in your cluster. You can see this provider calls another provider "composite-user-group-provider" which you will find further up in the authorizers.xml. This provider builds the authorizations.xml file. Note: Authorizations.xml file is only created if it does not already exist. If it exists already, no changes will be made to it if you modify this file. Expectation is that all new authorizations are made via the NiFi UI by your initial admin user. The next three providers control where the NiFi authorizer learns about the users to which authorizations will be granted. <userGroupProvider>
<identifier>composite-user-group-provider</identifier>
<class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
<property name="User Group Provider 1">ldap-user-group-provider</property>
<property name="Configurable User Group Provider">file-user-group-provider</property>
</userGroupProvider>
<userGroupProvider>
<identifier>file-user-group-provider</identifier>
<class>org.apache.nifi.authorization.FileUserGroupProvider</class>
<property name="Users File">/<path to>/users.xml</property>
<property name="Initial User Identity 2"><dn of NiFi node 1></property>
<property name="Initial User Identity 3"><dn of NiFi node 2></property>
<property name="Initial User Identity 4"><dn of NiFi node 3></property>
</userGroupProvider>
<userGroupProvider>
<identifier>ldap-user-group-provider</identifier>
<class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
<property name="Authentication Strategy">SIMPLE</property>
<property name="Manager DN"><dn of ldap manger></property>
<property name="Manager Password"><ldap manager password></property>
<property name="TLS - Keystore"></property>
<property name="TLS - Keystore Password"></property>
<property name="TLS - Keystore Type"></property>
<property name="TLS - Truststore"></property>
<property name="TLS - Truststore Password"></property>
<property name="TLS - Truststore Type"></property>
<property name="TLS - Client Auth"></property>
<property name="TLS - Protocol"></property>
<property name="TLS - Shutdown Gracefully"></property>
<property name="Referral Strategy">FOLLOW</property>
<property name="Connect Timeout">10 secs</property>
<property name="Read Timeout">10 secs</property>
<property name="Url">ldap://<hosntame>:<port></property>
<property name="Page Size">500</property>
<property name="Sync Interval">30 mins</property>
<property name="User Search Base"></property>
<property name="User Object Class"></property>
<property name="User Search Scope">SUBTREE</property>
<property name="User Search Filter"></property>
<property name="User Identity Attribute"></property>
<property name="User Group Name Attribute"></property>
<property name="User Group Name Attribute - Referenced Group Attribute"></property>
<property name="Group Search Base"></property>
<property name="Group Object Class"></property>
<property name="Group Search Scope">SUBTREE</property>
<property name="Group Search Filter"></property>
<property name="Group Name Attribute"></property>
<property name="Group Member Attribute"></property>
<property name="Group Member Attribute - Referenced User Attribute"></property>
</userGroupProvider> Of course you will need to fill in all the required ldap properties to sync you users and groups from your LDAP. The file-user-group-provider is used for creating any local users. You will see above it created your NiFi nodes as local users in a users.xml file since it is very unlikely these NiFi servers will exist in ldap. Note; The users.xml is only generated once. If it already exists it will not be updated with any future changes made to this file. Expectations are that additional local users are added by your initial admin manually via the NiFi UI. Note: Always use search filters to limit number of users and groups returned to only those that will be accessing your NiFi. NiFi holds all these returned users/groups in heap memory, so you want to avoid syncing the entire ldap. Other things to keep in mind... 1. NiFi is case sensitive. User "John Smith" is not the same as user "john smith". 2. The user string that results from successful authentication must match EXACTLY" with the user string returned by the ldap-user-group-provider or file-user-group-provider. NIFi identity mapping patterns can be used to trim/modify the strings returned by both the authentication provider and authorization providers so they match: examples: nifi.security.group.mapping.pattern.anygroup=(.*?)
nifi.security.group.mapping.value.anygroup=$1
nifi.security.group.mapping.transform.anygroup=LOWER
nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?)$
nifi.security.identity.mapping.value.dn=$1
nifi.security.identity.mapping.transform.dn=LOWER
nifi.security.identity.mapping.pattern.kerb=^(.*?)@(.*?)$
nifi.security.identity.mapping.value.kerb=$1
nifi.security.identity.mapping.transform.kerb=LOWER Hope this helps you get started, Matt
... View more
11-12-2019
09:35 AM
@vamcits You can use EvaluateJsonPath processor to extract the 0 from the json you shared You can name the custom property added above to whatever you like (I just used C0). This will result in a "C0" attribute being generated on the FlowFile with a value of "[0]". If you don't wan the square brackets around the number, you can remove them using an updateAttribute processor as follows: Then you can perform your routing with the RouteOnAttribute processor. Hope this helps, Matt
... View more
11-12-2019
07:33 AM
@Raghu_daredev Just to me clear, the timestamps you want to compare are part of the json content of each FlowFile before the mergeContent or are you trying to compare the timestamp NiFi assigns to each FlowFile when it is created in NiFi. If it is in the JSON content, you could use the EvaluateJsonPath processor to extract the timestamps in to NiFi attributes and then compare those individual timestamps in a series of NiFi EL statements in a single UpdateAttribute processor (assuming the exact same number of merged FlowFiles in each new Merged FlowFile). If it based in timestamp created by NiFi when they became FlowFiles, this becomes more challenging. While it is possible to persist these individual timestamp to a unique attribute on each FlowFile allowing them to persist as those unique attributes in the new merged FlowFile, the uniqueness is going to make it impossible to reference them in NiFi EL statements. You may need to add the timestamp value to the source JSON before the merge so that it can be extracted numerically via EvaluateJsonPath after merge. Matt
... View more
11-12-2019
05:36 AM
@Raghu_daredev Your question states "contents of a FlowFile". Each FlowFile only references one piece of content. Are you saying the content of your FlowFile contains two different timestamps in it and you would like to calculate the time difference between those two timestamps? If so, you would need to extract those two timestamps from the content in to unique FlowFile attributes on that FlowFile. You could then use an UpdateAttribute processor that creates another new FlowFile attribute that contains the time difference derived using the NiFi Expression Language (EL) to do the timestamp conversion to numbers and then subtract one from the other. With out an example of your timestamps, it would be difficult to provide and example of a NiFi EL statement to accomplish this task. Matt
... View more
11-08-2019
12:14 PM
@ChampagneM12 Please start a new thread for your new issue. We try to keep one question to a thread to avoid confusion and help users who made also have the same embedded question you are asking find it easier. Ping me in your new thread and I would be happy to help as much as i can. Matt
... View more
11-08-2019
04:40 AM
1 Kudo
@ChampagneM12 When you install a NiFi cluster, you start with a blank canvas. So there is no data ingestion at first. The user must construct data flow(s) to meet their individual use cases as I am sure you know. Handling data ingestion through an outage is handled through your implementation. Lets assume you are ingesting from Kafka in to NiFi since you mentioned you use Kafka. You would likely start that dataflow with a ConsumeKafka processor. Le's also assume you have a 3 node NiFi cluster and the Kafka topic you are consuming from has 12 partitions. Since all nodes in your cluster will be executing the consumeKafka processor, each will be a consumer of that topic. With a single concurrent task (default) configured on the ConsumeKafka, each of those 3 NiFi node's consumeKafka will be assigned 4 partitions each. If you were to set the Concurrent tasks to 4, then you now have a total of 12 consumers (one for each Kafka partition). Now lets assume one of your NiFi nodes goes down, Kafka will see a drop in the number of consumers from 12 to 8 and rebalance. So consumption will continue with some of those consumers now being assigned multiple partitions until the down NiFi node comes back on line. That is just one scenario. In the case of using a NiFI Listen type processor (example: ListenTCP). This starts a TCP socket listener on each node in the NiFi cluster on the same port. In this case it would be the client or some external mechanism that would need to handle failover to a different node in the event a NiFi node goes down. This is typically handled with an external load balancer which distributes data to all the NiFi nodes or switches to a different node when a node goes down. In the use case of something like ListSFTP, this processor would be configured to run on "primary node" only. Zookeeper is responsible for electing a primary node and a cluster coordinator in a NiFi cluster. NiFi processor components like ListSFTP are designed for primary node execution only and store state on the data listed in cluster state (within zookeeper). If the current elected primary node goes down, another node in the NiFi cluster is elected the new primary node and the primary node only configured processors are started non that new node. Last recorded state for that component reported to ZK by the previous primary node is pulled from ZK to the new primary node processor and it picks up listing from there. Again you have redundancy. The only place in NiFi were you can have data delay, is when a NiFi node goes down while it still has active data in its connection queue(s). Other nodes will not have access to that data on the other down node to take over work on it. It will remain in that node's flowfile and content repositories until that node has been restored and can continue processing on that queued FlowFiles. So it is important to protect those two NiFi repositories using RAID configured drives. You can minimize impact in such cases through good flow design and use of back pressure to limit amount of FlowFiles that can queue on a NiFi node. Also keep in mind that while the flowfie and content repositories are tightly coupled to the flow.xml.gz, these items are not tightly coupled to a specific piece of hardware. You can stand up an entirely new node for you cluster and move the flow.xml.gz, content repo and flowfile repo on to that node before starting it and that new node will continue processing the queued FlowFiles. Hope this helps, Matt
... View more
11-07-2019
08:33 AM
1 Kudo
@ChampagneM12 Running multiple NiFi nodes within the same NiFi cluster on the same system is not recommended, but can be done. This is possible by editing the nifi.properties file for each NiFi node so that is binds to its own unique HTTP ports for the following settings: nifi.remote.input.socket.port= nifi.web.http(s).port= nifi.cluster.node.protocol.port= nifi.cluster.node.load.balance.port= On startup NiFi will bind to these ports and multiple nodes on the same server can not bind to the same port. Also keep in mind that multple NiFi instances can NOT share resources like any of the repositories (database, flowfile, content, or provenance), local state directories, etc. so make sure those are all set to unique paths per node in the nifi configuration files (nifi.properties, state-management.xml, authorizers.xml) This will allow you to have multiple nodes loaded on the same server in same NiFi cluster. You will however potentially run in to issues when you start building your dataflows... Each instance will run its own copy of the data flows you construct. So any processor or controller service you add that sets up a listener will not work as only one node in your cluster will successfully bind to the configured port (there is not workaround for this). So total success here is going to in part depend on what kind of data flows you will be building. Hope this helps, Matt
... View more