Member since
07-30-2019
3391
Posts
1618
Kudos Received
1000
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 278 | 11-05-2025 11:01 AM | |
| 163 | 11-05-2025 08:01 AM | |
| 500 | 10-20-2025 06:29 AM | |
| 640 | 10-10-2025 08:03 AM | |
| 405 | 10-08-2025 10:52 AM |
06-15-2017
01:07 PM
@Prakash Ravi Nodes in a NiFi cluster have no idea about the existence of other nodes in the cluster. Nodes simply send heath and status heartbeat messages to the currently elected cluster coordinator. As such, each node runs its own copy of the flow.xml.gz file and works on its own set of FlowFiles. So if you have 9 NiFi nodes, each node will be running its own copy of the consumeKafka processor. With 1 concurrent task set on the processor, each node will establish one consumer connection to the Kafka topic. So you would have 9 consumers for 10 partitions. So in order to consume from all partitions you will need to configure 2 concurrent tasks. This will give you 18 consumers for 10 partitions. Kafka will assign a partition connections within this pool of 18 consumers. Ideally you would see 1 consumer on 8 of your nodes and 2 on one. The data to your niFi cluster will not be evenly balanced because of the in-balance in number of consumers versus partitions. As far as your Kafka Broker rebalance goes.... Kafka will trigger a rebalance if a consumer disconnects and another consumer connects. Things that can cause a consumer to disconnect include: 1. Shutting down one or more of your NiFi nodes. 2. Connection timeout between a consumer and a Kafka broker. - Triggered by network issues between a NiFi node and Kafka broker - Triggered by scheduling Consume Kafka run schedule for longer then configured timeout. for example a 60 second run schedule and 30 second timeout. - Triggered by backpressure being applied on the connection leading off the consumeKafka causing ConsumeKafka to not run until backpressure is gone. *** This trigger was fixed in NiFi 1.2, but i don't knwo what version you are running. I you feel I have addressed your original question, please mark this answer as accepted ( ) to close out this thread. Thank you, Matt
... View more
06-15-2017
12:17 PM
@Johny Travolta I don't understand how not having a shared token necessarily means you do not have a cluster. You will get better traction form the community if you move this to a new question. I am not a NiFi developer myself, so I could not comment on the complexity of implementing a shared LDAP token response across all nodes in a NiFi cluster. But I am sure if you open a new question around this topic, you will get a response from someone who can answer for you. Thanks, Matt
... View more
06-14-2017
01:59 PM
1 Kudo
@Thierry Vernhet With number 3, I am assuming that every file has a unique filename from which to determine if the same filename has ever been listed more then once. If that is not the case, then you would need to use detectDuplicate after fetching the actual data (less desirable since you will have wasted the resources to potential fetch the same files twice before deleting the duplicate. Let assume every file has a unique filename. If so the detect duplicate flow would look like this: with the DetectDuplicate configured as follows: You will also need to add two controller services to your NiFi: - DistributedMapCacheServer - DistributedMapCacheClientService The value associated to the "filename" attribute on the FlowFile is checked against entries in the DistributedMapCacheServer. If filename does not exist, it is added. If it exists already then FlowFile is routed to duplicate relationship. In scenario 2 where filenames may be reused we need to detect if the content after fetch is a duplicate or not. IN this case the flow may look like this: After fetching the content of a FlowFile, the "HashContent" processor is used to create a hash of the content and write it to a FlowFile attribute (default is hash.value). The detectDuplicate processor then configured to look for FlowFile with the same hash.value to determine if they are duplicates. FlowFiles where the content hash already exist in the distributedMapCacheServer, those FlowFile are routed to duplicate where you can delete them if you like. If you found this answer addressed your original question, please mark it as accepted by clicking under the answer. Thanks, Matt
... View more
06-14-2017
12:39 PM
1 Kudo
@Narasimma varman Try configuring the "Database Driver Jar Url" property with the absolute path to you "postgresql-42.1.1.jre7.jar" file. for example: c:/post/postgresql-42.1.1.jre7.jar /post/postgresql-42.1.1.jre7.jar
Also check out the nifi-app.log for a full stack trace that may follow the above ERROR that may give more detail on why it can't load the file. Thanks, Matt
... View more
06-14-2017
12:30 PM
3 Kudos
@Thierry Vernhet The ListFile processor will list all non-hidden file it sees in the target directory. It then will record the latest timestamp of batch of files it listed in state management. This timestamp is what is used to determine what new files to list in next run. Since the timestamp has changed, the same file will be listed again. A few suggestion in preferred order would be: 1. Change how files are being written to this directory. - The ListFile processor will ignore and hidden files. So File being written as ".myfile.txt" will be ignored until the filename has changed to just "myfile.txt". 2. Change the "Minimum File Age" setting on the processor to a high enough value to allows source system to complete file writes to this directory. 3. Add a detectDuplicate processor after your listFile processor to detect duplicate listed files and remove them from the your dataflow before the FetchFile processor. Thanks, Matt
... View more
06-14-2017
12:09 PM
@estefania rabadan There is no processor configuration option to turn off what attributes a processor writes on to a FlowFile it processes. However, you can use the UpdateAttribute processor to remove attributes from FlowFiles. Thanks, Matt
... View more
06-13-2017
09:25 PM
4 Kudos
@Prakash Ravi You have 9 NiFi nodes all running a ConsumeKafka processor configured with 3 concurrent tasks. That totals 27 consumers. Does the Kafka topic you are consuming from have 27 partitions? There can only be one consumer per partition on a topic. If you have more consumers then partitions, some of those consumers will never get any data. This likely explains the load distribution you are seeing. Whenever a new consumer is added or existing consumer is removed a rebalance is triggered. You will achieve your best performance when the number of partitions equals the number of consumers. Thanks, Matt
... View more
06-13-2017
05:42 PM
@Anoop Shet Sorry for the late response, but i don't get pinged unless you tag me in your response. The ListSFTP processor retains state on Files that have been listed. My guess here is the state is preventing these new filter form returning anything. Try clearing the state and see if it then lists the files based on your new filter or add a new ListSFTP processor using that different file filter. You can right click on the processor and select "View state". In the state UI for this processor you will see a link to "Clear state". If you found my answer addressed your original question, please mark as accepted to close out thsi thread. Thanks, Matt
... View more
06-13-2017
04:52 PM
1 Kudo
@Mahmoud Shash HDF 2.1.3 is a bad release. You are running into the exact Controller Service UI bug that resulted in HDF 2.1.3 being pulled and replaced with HDF 2.1.4. You can upgrade you HDF 2.1.3 to HDF 2.1.4 to fix this issue. Then you will be able to enable, disable, configure, and delete the HiveConnectionPool Controller service. Matt
... View more
06-13-2017
04:36 PM
@Narasimma varman
Sorry for late response, but I don't get pinged unless you add a comment to my response or tag me in your new answer. The dynamic properties expect the the "value" to be a valid NiFI expression language (EL) statement. Otherwise it is treated as a literal value. So i expect what you are seeing is that exact string passed in the nested header or some kind of session rollback, etc... Also not sure how you are pulling data using a "POST" method? Shouldn't you be using "GET"? Thanks,
Matt
... View more