Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 169 | 12-17-2025 05:55 AM | |
| 230 | 12-15-2025 01:29 PM | |
| 168 | 12-15-2025 06:50 AM | |
| 275 | 12-05-2025 08:25 AM | |
| 454 | 12-03-2025 10:21 AM |
05-08-2020
12:11 PM
@Griggsy I'd strongly encourage you to start a new question rather then asking for help on an existing question with an already accepted solution. You'll get better traction and visibility that way. Matt
... View more
05-06-2020
01:55 PM
@SamCloud Not really how the processor is designed to work. It order for a FlowFile to get outed to the duplicate path the distributed map cache must already have a matching entry in it. All i can think for you to try is setting up a second distributedMapCache server where you use detect Duplicate a second time. Those FlowFile routed down the duplicate connection the first time are used to populate the second cache server. Then all your FlowFiles routed to non duplicate path originally are checked against that second distributedMapCacheServer. Only thing here is you hav a bit of a race condition since you need to make sure the FlowFiles on the duplicate path are processed before those on the non-duplicate path. For that you might want to use maybe the wait and notify processors. Wait on non duplicate path and the notify on after the detectDuplicate on the duplicate path. You can see where this gets very complicated. You may need to develop something custom here. Matt
... View more
05-05-2020
12:07 PM
1 Kudo
@ppbirajdar My next thought would be to just drop the cloned FlowFile right before the MergeContent since you intent is not to reassemble the exact same original file. Your intent, if i understand correctly, is to just create one file that contains 1 FlowFile with each unique fragment index. If that is the case, perhaps all you need is a detectDuplicate in front of your mergeContent that uses a combination of Fragment.identifier and fragment.index for determining if there is a duplicate FlowFile to drop. You would simply set the "Cache Entry Identifier" to "${fragment.identifier}-${fragment.index}". Then only the first occurrence of a FlowFile with unique fragment.index+fragment.identifier will make it in to the connection queue for your MergeContent processor. Then the Defragment merge strategy will work successfully giving you what I believe you are looking for here. Hope this helps with your use case, Matt
... View more
04-30-2020
01:20 PM
1 Kudo
@vivek12 Your last statement is confusing to me. Are you saying only the invokeHTTP processor is stopped after a NiFi restart? This implies that someone stopped it or the flow.xml.gz is not getting updated with that processors last known state. I'd inspect what is written in your flow.xml.gz on each node to make sure they all show the state as running. The last known state of a processor is not something that is checked as node join a cluster. Nodes joining are checked to make sure their flow.xml matches the elected cluster flow and if it does, the node can join. That node will then be told to start those components which are running in the cluster. When you restart the entire cluster each node presents its flow. First node has Flow x and gets 1 vote, next node flow is checked and it matches exactly, that flow gets 2 votes. Next node comes in and his flow is not exactly the same, so it gets 1 vote. The flow with the most votes becomes the cluster flow. If you have an even number of nodes (for example 4 nodes) and 2 node's flows get a vote and other 2 node's get a different vote. since you have 2 vs 2, NiFi will end up picking one at random. My concern here is that some node(s) have last known state as stopped while another is running. So sometimes with a complete restart of your cluster you end up starting the flow with the stopped state on this processor. The other possibility is this invokeHTTP processor is failing validation on some node on startup and resulting in processor being stopped. Have you tried copying the flow.xml.gz from one node to all the other nodes? Hope this helps, Matt
... View more
04-30-2020
12:59 PM
@SamCloud The intent of the detect duplicate is to allow the first FlowFile through and then route all other FlowFiles with same cache entry identifier to duplicate. So when a FlowFile reaches this processor it checks the Distributed Map Cache for the Cache Entry identifier, if it does not exist it is added to the cache and the FlowFile is routed to non-duplicate. If it is found, then FlowFile is routed to duplicate. Based on the description you provided, it is working as intended/designed. Hope this helps, Matt
... View more
04-30-2020
12:06 PM
@ppbirajdar If you have multiple FlowFiles with the same fragement.index and fragment.identifier, this indicates that at some point in your dataflow this FlowFIle was likely cloned (Same FlowFile routed to multiple relationships) or split (two FlowFiles created from one). For example you dragged the success relationship twice. Each of those FlowFiles will have however different UUID assigned to them. You could use NiFi Provenance to look at the lineage of these FlowFIles to see at what point in you flow the clone was created. In your case a CSV record that gets split into two FlowFIles (one with invalid records and another with some valid records) is going to be your issue since you route them to same destination processor. You didn't share your MergeContent processor configuration, but keep in mind the following: 1. A processor executes based on its configured run Schedule. The default run schedule ins always 0 secs which means execute as fast as it can. This means a processor will execute and once it completes that execution, it will execute again. (If at execution there is no work to do, it will yield for a period of time before next execution is allowed to avoid excessive CPU usage when no work exists). 2. At time of Execution the MergeContent will look at what new FlowFiles since last execution and allocate those FlowFiles to bins. If at end of that execution a bin meets the minimums configured, the bin will be merged (Mins don't matter if using Defragment merge strategy). 3. When using Defragment, the expectation is that all FlowFiles with same Fragment.identifier also have same Fragment.count and that only one FlowFile exists in a bin for each fragment.index value. It would be impossible to reassemble to original FlowFile since you have multiple FlowFiles now with the same fragment.identifier and fragment.index values. The real question is whether you are actually trying to reassemble the exact original FlowFile or simply create a single FlowFile for which to trigger downstream processing? If the latter is the case, don't use Defragment merge strategy. Use Bin-Packing Algorithm and set fragment.identifier as the "Correlation Attribute Name". Since you really have no idea how many fragments there will be at this point, set your mins number of entires to a value large enough it would never be reached for a single fragment.identifier and then set the "max bin age" to a value high enough that you fee comfortable all fragments would have made it to the MergeContent. Max.bin.age will force a bin to be merged at x amount of time after first FlowFile was allocated to that bin even if mins were not met. Also make sure you have configured n+1 number of bins, where "n" is the number of expected unique fragment.identifiers you expect to have at any one being binned by the MergeContent processor. Hope this helps, Matt
... View more
04-30-2020
08:28 AM
1 Kudo
@Logann NiFi does not offer local user creation for authentication. There is no way to create local users and assign them passwords for the purpose of user authentication. User Authentication require either: 1. User certificates (always requested by NiFi during TLS handshake) 2. Spnego auth (Spnego auth challenge sent to browser if spnego properties configured in nifi.properties. This request is only sent if 1 did not result in client certificate in response from client) 3. Configured login provider (uses login-provider configured in login-identity-providers.xml and referenced in nifi.properties file. Only used if both 1 and 2 did not provide client/user authentication already). 4. NiFi will also support other OpenID Connect supported authentication providers. Hope this helps, Matt
... View more
04-30-2020
05:29 AM
2 Kudos
@abhinav_joshi This is something that only affects the Apache NiFi releases only. The removal of some nars is documented in the Apache NiFi migration guidance wiki here: https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance "Starting in 1.10, the following nars were removed from the default convenience binary. These include kite-nar, kafka-0-8-nar, flume-nar, media-nar, druid-controller-service-api-nar, druid-nar, other-graph-services-nar. You can still get them from the various artifact repositories and use them in your flows but we cannot bundle them due to space limitations by default." For Apache NiFi users, I recommend that users create a custom NiFi lib directory by adding the following property to their nifi.properties file: nifi.nar.library.directory.<unique-string>=/<path to>/custom-lib1
for exmaple:
nifi.nar.library.directory.NiFi-1-10-nars=/nars/nifi-1-10-lib You can add as many custom lib directories as you want. Then place any nars that are noted as removed via the migration guidance documentation into one of these custom lib paths before upgrading. Then as part of your upgrade process add the above property to your nifi.properties file. Then later if additional nar bundles are deprecated, you can create another custom lib dir or just add those nars to your existing custom lib directory before upgrading. This allows you to avoid the downtime you encountered. The Cloudera releases of HDF and CFM include are built with all the nars by default currently as they are not affected by space limitations. Hope this helps, Matt
... View more
04-21-2020
07:59 AM
@Rohitravi The None of NiFi's processors will release any FlowFiles to a downstream connection until the end of the thread operation. This is to protect users from dataloss and in some cases data duplication in the result of a failure. In the case of a SplitText processor you have configured to split on every 10 lines. The processor will stream the content of the first 10 lines in to a content claim in the content_repository and create a new FlowFile record pointing at that claim. The next 10 lines may or may not go into that same content claim and another FlowFile record is created. above process continues until all splits have been created. Then The processor releases all FlowFile created to the downstream connection at the same time. NiFi does not guarantee FlowFile processing order. You can adding the FirstInFirstOutPrioritizer to the downstream connections to help with ordering some. Hope this helps, Matt
... View more
04-15-2020
07:54 AM
2 Kudos
@memad If your GetFile processor is consuming files before they have finished writing there are a few changes that may help: 1. How are files being written in to the directory? The default "File Filter" will ignore files that start with a ".". If it is possible to change how files are being written to the directory, that will solve your issue through a file filter. For example.... writing new files to directory as ".<filename>" and upon successful write does a rename to remove the dot (this is how ssh works). But you can of course setup any file filter that works for you, 2. Assuming the process that is writing files to the directory is always updating the timestamp on the file, you can use the "Minimum File Age" property to prevent the GetFile from consuming a file until the last modified timestamp in the file has not updated for the configured amount of time. This works in most cases, except when there may be long pauses in the write process that exceeds the configured Min File Age time. Hope this helps, Matt
... View more