Member since
07-30-2019
3467
Posts
1641
Kudos Received
1016
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 200 | 05-04-2026 05:20 AM | |
| 450 | 03-23-2026 05:44 AM | |
| 343 | 02-18-2026 09:59 AM | |
| 590 | 01-27-2026 12:46 PM | |
| 1025 | 01-20-2026 05:42 AM |
04-12-2021
06:35 AM
1 Kudo
@ram_g With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order? If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream. Hope this helps, Matt
... View more
04-12-2021
06:16 AM
1 Kudo
@Former Member I have a HDF 3.4.1.1 cluster (Based off NiFi 1.11.4) setup and with PGs version controlled and can change processors from started to stopped to disabled without it triggering a local change. However, HDF 3.4.1.1 ships with NiFi-Registry 0.3 and not 0.8. I have another HDF 3.5.2 cluster (based off NiFi 1.12.1) and ships with NiFi-Registry 0.8. In that cluster, I can also change a processor from start to stop to disabled and it does trigger a local change. I see someone filled a Jira about this change in behavior: https://issues.apache.org/jira/browse/NIFI-8160 The tracking of Enabled and Disabled State in NiFi-Registry was added as part of: https://issues.apache.org/jira/browse/NIFI-6025 Hope this helps, Matt
... View more
04-12-2021
05:49 AM
@AnkushKoul Since the 30 seconds since last execution has past, the processor is available to be immediately scheduled once a thread becomes available. So second thread would not wait till 60 seconds. This setting is minimum wait between executions. Other factors come int play that can affect component execution scheduling. NiFi hands out threads to processors from the Max Timer Driven Thread Count resource pool set via Controller Settings under the global menu in the upper right corner. Naturally you will have more components on your canvas then the size of this resource pool (which should be set initially to only 2-4 times the number fo cores you have on a single node since setting applies per node). NiFi will hand these available threads out to processors requesting CPU time to execute. Most component threads are in the range of milliseconds of execution, bit some can be more resource intensive and take longer to complete. Before increasing this resource pool, you should monitor the CPU impact/usage with all your dataflows running. Then make small increments if resource exist. Hope this answers your questions. If, so please take. moment to accept the answer(s) that helped. Matt
... View more
04-08-2021
08:51 AM
@John_Wise @TimA Let me make sure I understand exactly what change you are making. I have Process Groups (PG) that are version controlled in my NiFi Registry. I have both a NiFi 1.11.4 and NiFi 1.12.1 clusters setup. If I import a flow from registry and then modify the state (start, stop, disable, enable) of any processor, my PGs do not change to say local changes exist. The state of a processor does not track as a local change. I suspect some other local change is being made in addition to state change. If you right click on the PG and under "Version" from displayed context menu select "show local changes" what are the tracked changes being reported? Hope this helps, Matt
... View more
04-08-2021
08:41 AM
@AnkushKoul Since you only have 1 concurrent task configured, while that concurrent task thread is in use, another thread can not be started. So even with a runs schedule of 0 secs, another task can't start until the thread tied to that concurrent task is released making it possible for another execution to happen. At 30 secs it will only be allowed to execute again 30 secs later if there is an available concurrent task not in use already on the processor. Setting 30 seconds can create an artificial delay in your dataflow when tasks takes less than 30 seconds to complete. Note: While the processor is executing a task you will see a small number displayed in the upper right corner of the processor.
... View more
04-08-2021
08:35 AM
1 Kudo
@ram_g @Magudeswaran Guaranteeing order in NiFi can be challenging. As far as the prioritizers on the connection go: FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. This looks at timestamp recorded for FlowFile when the FlowFile entered this connection. In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Typically with such processors all output FlowFiles are committed to the downstream connection at the same time which makes using this prioritizer a challenge if that is the case. But generally processors that produce multiple FlowFiles from a single FlowFile also set FlowFile attributes that identify the fragments. Take a look at the attributes written by the SplitRecord processor as an example. OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'. This looks at the FlowFile creation timestamp. In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Are all output FlowFiles created as new? Now you may want to look at the following prioritizer: PriorityAttributePrioritizer: Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. If only one has that attribute it will go first. Values for the "priority" attribute can be alphanumeric, where "a" will come before "z" and "1" before "9" If "priority" attribute cannot be parsed as a long, unicode string ordering will be used. For example: "99" and "100" will be ordered so the flowfile with "99" comes first, but "A-99" and "A-100" will sort so the flowfile with "A-100" comes first. Assuming your custom processor writes some unique attribute(s) to the FlowFiles it outputs, you may be able to use those attributes to enforce ordering downstream via above prioritizer. *** Also keep in mind that NiFi connection are "soft" limits. If your ere to set backpressure object threshold on connection outbound from your custom processor to 1 and on execution of your processor it produced 6 FlowFiles, they would all get committed to that connection. Only then does backpressure kick in and prevent your custom processor from being scheduled again until queue drops to below the backpressure threshold again. This is a good way of making sure only one "batch" of FlowFiles lands in the downstream connection at a time, but will not help enforce the order of the FlowFiles in that batch. Hope this helps, Matt
... View more
04-08-2021
08:02 AM
@AnkushKoul By only having 1 concurrent task configured, you are affectively forcing that task to complete before the next can execute. With your RunSchedule set to "30 sec" NiFi will only schedule this component to execute every 30 seconds. So if task1 takes only 20 seconds to complete, task 2 would not get started until 10 seconds later. If you set RunSchedule to default 0 secs, that tells NiFi to schedule this component to execute as often as possible. So as soon as task 1 completes task 2 will then execute. You can think of concurrent tasks as a way to parallelize execution within a single component. So instead of having two processors you have one with 2 concurrent tasks. Each task gets schedule independent (parallel) of the other concurrent task(s). Each concurrent task will work on different FlowFile(s) from inbound connection(s). Some components will not support multiple concurrent tasks (the component source code would limit it to 1) So to me it sounds like you want tasks to kick off as fast as possible one after another. IN that case leave RunSchedule at 0 secs and concurrent tasks to 1. If you found this answer addressed your question, please take a moment to accept the answer. Hope this helps, Matt
... View more
03-15-2021
04:04 PM
2 Kudos
@sambeth The hash (#) character is reserved as a delimiter to separate the URI of an object from a fragment identifier. Registry has a number of different fragment Identifiers. The fragment identifier represents a part of, fragment of, or a sub-function within, an object. The fragment identifier follows the "/#/" in the URL and can represent fragments in text documents by line and character range, or in graphics by coordinates, or in structured documents using ladders. For example the "grid-list" of flows displayed when you access the NiFi UI. No, you cannot remove the # from the URL. Are you encountering an issue? Hope this helps, Matt
... View more
03-03-2021
06:28 AM
1 Kudo
@Pavitran This does present a challenge. Typically the ListFile is used to list files from a local file system. That processor component is designed record state (default based on last modified timestamp) so that only newer files are consumed. But the first run would result in listing all files by default. Also looking at your example, your latest directory does not correspond to current day. The listFile (does not actually consume to content) generates a 0 byte FlowFile for each file listed along with some attributes/metadata about the source file. The FetchFile processor would then be used to fetch the actual content, this allows with large listing to redistribute these 0 byte FlowFiles across all nodes in your cluster before consuming the content (provided same local file system is mounted across all nodes. If different files per node, do not load balance between processors). So you could make a first run which lists everything and just delete those 0 byte files. That would establish state. Then from that point on the ListFile would only list the newest files created. Pros: 1. State allows this processor to be unaffected by outages, the processor will still consume latest all non previously listed files after an outage. Cons: 1. You have this initial run which would create potentially a lot of 0 byte FlowFiles to get rid of in order to establish state. 2. With an extended outage, on restart of the flow it may consume more than just the latest since it will be consuming all files with newer timestamps than timestamp last stored in state. Other options: A: The ListFile processor has an optional property that sets the "Maximum File Age" which limits the listing of Files to those not olde then x set amount of time. Pros to. setting this property: 1. Reduces or eliminates the massive listing on first run Cons to setting this property: 2. Under an extended outage, where outage exceeds configured "Maximum File Age", a file you wanted listed may be skipped. B: Since the FetchFile uses attributes/metadata from to fetch the actual content, you could craft a source FlowFile on your own and send it to the FetchFile processor. For example, use a ExecuteStreamCommand processor execute a bash file on disk to get the list of Files only from the latest directory. Then use UpdateAttribute to add the other required attributes needed by FetchFile to get the actual content. Then use SplitFile to split that listing of files in to individual FlowFiles before the FetchFile processor. Pros: 1. You are in control of what is being listed. Cons: 1. Depending on how often a new directory is created and how often you run your ExecuteStreamCommand processor, you may end up listing the same source files over again since you will not have a state option with ExecuteStreamCommand. But you may be able to handle this via detectDuplicate processor in your flow design. 2. If the listed Directory has some new file added to it post previous listing by ExecuteStreamCommand, next run will list all previous files again along with new ones for same directory. Again, might be able to handle this with detectDuplicate processor. Hope this helps give you some ideas, Matt
... View more
02-11-2021
07:57 AM
@adhishankarit When moving on to a new issue, I recommend always starting a new query for better visibility. (for example, someone else in the community may have more experience with new issue then me). As far as your new query, your screenshots do not show any stats on the processor to get an idea of what we are talking about here in terms of performance. How many fragments are getting merged? How large are each of these fragments? NiFi nodes are only aware and have access to the FlowFiles on the individual node. So if node a is "out" (not sure what that means), any FlowFiles still on node "a" that are part of the same fragment will not yet be transferred to node b or c to get binned for merge. The Bin can not be merged until all fragments are present on the same node. Since you mention that bin eventually happens after 10 minutes, tells me that eventually all fragments eventually make it on to the same node. I suggest the first thing to address here is your space issue on your nodes. Also keep in mind that while you have noticed that node "a" has always been your elected primary node, there is no guarantee that will always be the case. A new Cluster Coordinator and Primary node can be elected by Zookeeper at anytime. If you shutdown or disconnect currently elected primary node "a" you should see another node get elected as primary node. Adding node "a" back in will not force ZK to elect it back as primary node. So don't build your flow around a dependency on any specific node being primary node all the time. Matt
... View more