Member since
07-30-2019
3379
Posts
1616
Kudos Received
998
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 153 | 10-20-2025 06:29 AM | |
| 293 | 10-10-2025 08:03 AM | |
| 231 | 10-08-2025 10:52 AM | |
| 211 | 10-08-2025 10:36 AM | |
| 288 | 10-03-2025 06:04 AM |
10-04-2016
04:53 PM
1 Kudo
@Ankit Jain Let me make sure I understand your flow completely. - You have 4 consumeKafka processors all reading from the same topic? If this is your intent, You should have a single consume Kafka processor with the success relationship drawn off of it 4 times (one to each unique putHDFS processor). This cuts down on disk I/O since the consumed data is only written to the NiFi content repository once. - Then you are trying to write that same data to 4 different HDFS endpoints? With only 3 partitions on your Kafka, you can only have three consumers at a time. With 4 nodes in your cluster, one of the nodes at any given time will not be consuming any data. Optimally the number of partitions would be equal to or multiples of the number of nodes in your NiFi cluster. (For example with 4 partitions, you would have 4 nodes with the consumeKafka processor running with 1 concurrent task. With 8 partitions, you would have 4 nodes with the consumeKafka processor 2 concurrent tasks.) Would be interesting to know more about your custom Kafka processor and how it differs from the "Max Poll Records" property in the existing consumeKafka processor. Redistributing data across your cluster is only necessary when dealing with ingest type processors that are not cluster friendly such as getSFTP, listSFTP, GetFTP, etc.... With ConsumeKafka, the most optimized approach is as I described above. Your question about how do I know if all files were consumed form topic... A Kafka topic is typically a living thing with more and more files written and removed from it. Not sure How NiFi would know when all files are consumed. NiFi will just continue to poll the topic for new files. If there is nothing new, NiFi gets nothing. It is the Kafka server that keeps track of what files were served up to a consumer, NiFi does not keep a listing itself. Data is not passed to the success relationship until it is consumed completely successfully. NiFi provenance could be used to track particular files or list all FlowFiles created by a consumeKafka processor, but you would need to know how many files were on the topic, NiFi will not know that. Matt
... View more
10-03-2016
12:18 PM
2 Kudos
@vnandigam There are two parts to successfully accessing the NiFi UI, Authentication and Authorization. Since you are getting the insufficient permissions screen, you have successfully authenticated. First you should confirm the DN pattern of this user that has successfully authenticated. If you tail the nifi-user.log while you access your NiFi's UI, you will see a line similar to the following: 2016-10-03 11:47:15,134 INFO [NiFi Web Server-65795] o.a.n.w.s.NiFiAuthenticationFilter Authentication success for CN=nifiadmin,OU=hortonworks Examine the DN presented. Does it match exactly what you had in your "Initial Admin Identity" property you set? Next you will want to confirm that this user was properly added to the users.xml file: <user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa" identity="CN=nifiadmin,OU=hortonworks"/> The user if it exists will be assigned a UUID (The above UUID is just an example and yours will be different.) Next, verify this user was given the ability to "view the user interface" by examining the authorizations.xml file. Within this file you would expect to see the user's UUID above assigned to one or more policies. In order to even see the UI, users must have the "R" to the "/flow" policy: <policy identifier="6a57bf03-2a93-39d0-87dd-e3aa30f0cd4d" resource="/flow" action="R">
<user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa"/>
</policy>
In order to be able to add users to additional access policies, the user would also need "R" and "W" to the "/policies" policy (You can think of this as the Global Admin policy): <policy identifier="9a3a1c92-fa10-3f9d-b2f7-5cd56cd2ca00" resource="/policies" action="R">
<user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa"/>
</policy>
<policy identifier="1ff611dd-1536-31f5-a610-64e192e4c43c" resource="/policies" action="W">
<user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa"/>
</policy>
If you user has both of the above, you should be able to access the UI and use the interface to grant additional users access and add additional levels of access for yourself and/or any user you added. The following policies are what gives a user the ability to create, modify, and delete new users and/or groups: <policy identifier="dee16f9e-1f09-37ee-806b-e372f1051816" resource="/tenants" action="R">
<user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa"/>
</policy>
<policy identifier="69839728-eaf3-345d-849f-e2790cf236ab" resource="/tenants" action="W">
<user identifier="9d7b4fe2-8e8b-30a5-8e2a-f6a6a18addfa"/>
</policy>
If you find that your authorizations.xml file was empty (Had no policies set in it), it is likely your NiFi had been started previous to you setting the "Initial Admin Identity" property. This Property ONLY works the first time NiFi is started. If the authorizations.xml file was already generated, it will not be re-generated or updated on later starts of NiFi. To correct this, you can delete the authorizations.xml file and restart your NiFi. Since it does not exist this time, the "Initial Admin Identity" user will be created this time. ***Note, if other users already have granted authorizations in this file, those will be lost and will need to be re-created. Only delete the authorizations.xml file if wishing to start over from scratch. Thanks, Matt
... View more
09-30-2016
02:39 PM
2 Kudos
@Timothy Spann Looks like you do not have enough file handles The following command will show your current open file limits: # ulimit -a This should be min 10000, but may need to be even higher depending on the dataflow. Matt
... View more
09-29-2016
04:18 PM
@Breandán Mac Parland
If you are looking for a way to generate a file with the name "_SUCCESS", you can use the GenerateFlowFile processor to generate a file with random data as its content. You can the use an UpdateAttribute processor to set the filename to "_SUCCESS" by adding a new property with a property name equal to "filename" and a value of "_SUCCESS". Matt
... View more
09-27-2016
02:16 PM
2 Kudos
@Parag Garg NiFi can certainly handle dataflow with excess of 123 processors and well in excess of the number of FlowFiles you have here. Different processors exhibit different resource (CPU, Memory, and disk I/O) strain on your hardware. In addition to processors having an impact on memory, so do FlowFiles themselves. FlowFiles are a combination of the Physical content (stored in the NiFi content Repository) and FlowFile Attributes (Metadata associated to the content stored in heap memory). You can experience heap memory issues if your FlowFiles have very large attributes maps. (for example extracting the large amounts of content into attributes.) The first step is identifying which processor(s) in your flow are memory intensive resulting in your OutofMemoryError. Processors such as SplitText, SplitXML, and MergeContent can use a lot of heap if they are producing a lot of split files from a single file or merging a large number of files in to a single file. Th reason being is the merging and splitting is happening in memory until resulting FlowFile(s) are committed to the output relationship. There are ways of handling this resource exhaustion via dataflow design. (for example, merging a smaller number of files multiple times (using multiple MergeContent processors) to produce that one large file or splitting files multiple times (using multiple Split processors). Also be mindful of the number of concurrent tasks assigned to these memory intensive processors. Running with 4 GB of heap is good, but depending on your dataflow, you may find yourself needing 8 GB or more of heap to satisfy the demand created by your dataflow design. Thanks, Matt
... View more
09-26-2016
11:35 AM
@Felix Duterloo The Read/Write stats on the face of each processor tell you how much content is being read from or written to NiFi's content repository. It is not intended to tell you how much data is written to the next processor or some external system. The purpose is to help dataflow managers understand which processors in their dataflows are disk I/O intensive. The "out" stat tells you how many FlowFiles were routed out of this processor on to one or more output relationships. In the case of a processor like putHDFS, it is typical to auto-terminate the "success" relationship and Loop the "failure" relationship back on to the putHDFS processor itself. Any FlowFile routed to "success" has confirmed delivery to HDFS. FlowFiles routed to failure were unable to be delivered and should produce a bulletin and log messages as to why. If the failure relationship is looped back on your putHDFS, NiFi will try again to deliver the file after the FlowFile penalty has expired. Matt
... View more
09-22-2016
11:20 AM
@mayki wogno When asking new questions unrelated to the current thread, please start a new Community Connection question. This benefits the community at large who may be searching for answers to the same question.
... View more
09-21-2016
05:29 PM
1 Kudo
There is the possibility that the time could differ slightly (ms) between when both now() functions are called in that expression language which could cause the result to push pack to 11:58:59. To avoid this you can simply reduce 43260000 by a few milliseconds (43259990) to ensure that does not happen so 11:59:00 is always returned.
... View more
09-21-2016
04:34 PM
2 Kudos
@Sree Venkata You can do this using a combination of NiFi Expression Language (EL) functions: ${now()
:minus(${now():mod(86400000)})
:minus(43260000) :format('MM-dd-yyyy hh:mm:ss')
} This EL statement takes now subtracts the remainder resulting from dividing now by 86400000 (number of milliseconds in 24 hours) and then subtracts an additional 43260000 (12 hours and 1 minute) from that result and finally formatting the output in the date format you are looking for. I confirmed this EL statement by using it in an UpdateAttribute processor: and if I look at the attributes on a FlowFile that was processed by the above, I see: You can see that the attribute "yesterday" is set to exactly one day earlier from "current time" and 11:59:00. Thanks, Matt
... View more