Member since
07-30-2019
3470
Posts
1642
Kudos Received
1018
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 305 | 05-06-2026 09:16 AM | |
| 537 | 05-04-2026 05:20 AM | |
| 373 | 05-01-2026 10:15 AM | |
| 538 | 03-23-2026 05:44 AM | |
| 395 | 02-18-2026 09:59 AM |
05-03-2021
07:13 AM
@Arash A FlowFile consists of to parts: - FlowFile content - Content resides on disk in the content repository and not in heap memory. Some components used may need to load content in to memory to perform the function of that component. - FlowFile Attributes/metadata - FlowFiles actively queued in a connection will have their attributes/metadata held in heap memory. Swapping is the only mechanism that can move this FlowFile metadata/attribute data out of heap to swap on disk. It is important to remember that MiNiFi will only start swapping FlowFiles to disk once the swap threshold per connection reaches the configured value(default 20,000). Swap files are created in batches of 10,000. So in a smoothly running flow there should be very little, if any, swapping of FlowFile attributes/metadata happening. This should only be happening at times of data bursts. To keep heap usage down, limit the size of your connection queue backpressure object threshold. The default is 10000 which means a connection would never accumulate enough FlowFiles to trigger a swap file anyway normally (backpressure is a soft limit, so if a source processor is allowed to execute because the downstream connection is not applying backpressure yet and that source processor execution results in 30,000 FlowFiles being created, then all 30,000 are placed on downstream connection which would result in swap files being created). When you are building your dataflow via NiFi that you will use on your MiNiFi agent, be mindful of above and and look at the embedded documentation for the components you will be using in that dataflow. The embedded docs include resource consideration section under each component if there are known impacts on heap memory or cpu. Processors that merge or split FlowFiles commonly used can have an impact on heap memory if not configured wisely. Hope this helps remove some concern and provide useful insight. If you found this helpful, please take a moment to login and click accept on this solution. Matt
... View more
05-03-2021
06:09 AM
1 Kudo
@hkh The appender you shared is not valid. You have configured your appender rolling policy to use: SizeAndTimeBasedRollingPolicy However, your file naming patten only supports time based pattern ${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d.log This leaves you will two options: Option1: - Keep using the "SizeAndTimeBasedRollingPolicy", but change your file naming pattern - A pattern like "${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d{yyyy-MM-dd}.%i.log" The "{yyyy-MM-dd}" is option but allows you to specify the date format. - With above pattern logback will retain 1 day of log history, but may or may not have more then 1 log in a given day depending on volume of logging that is occurring. If a daily log reaches the configured "maxFileSize" the log with roll. This allows you to keep you logs at manageable sizes. When the log rolls it will get a one up number applied per this new file naming pattern. For example: nifi.app_2021-04-28.1.log nifi.app_2021-04-28.2.log - While this can still result in an unbounded number fo incremental logs files created in a single day, you can control overall disk usage by adding another property within the "rollingPolicy" section that will start purging incremental rolled logs if max amount of space consumed by these rolled logs exceeds this set value. Add this line below your "<maxHistory>1</maxHistory>" line: <totalSizeCap>3GB</totalSizeCap> Note: Will only remove rolled/archive logs and will not remove active log. Option 2: - Change the rolling policy you are using to "ch.qos.logback.core.rolling.TimeBasedRollingPolicy" - With this policy you can keep the file name pattern you already have "${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d.log" - You will need to comment out remove the following line: <maxFileSize>1GB</maxFileSize> it is only valid for size based rolling policies. - The downside of this setup is that a single daily log file can grow unbounded. As far as your other question goes.... The logback.xml is the only configuration file in NiFi that you can edit and its changes will take affect without needing a NiFi restart. Some caveats... NiFi did not write logback and it certainly has its quirks. For example... If you enter a bad configuration, it may simply just stop logging. The way maxHistory works in logback prevents the cleanup from looking at older rolled logs outside the maxHistory window, so you will need to cleanup the older rolled logs manually initially and not expect logback to do that clean-up for you after editing the logback.xml. Since you are making big changes to file naming pattern and potentially the rolling policy, i'd encourage you to restart NiFi anyway so it cleanly starts writing to the new log format on startup. Hope you found this helpful. If so take a moment to login and click accept on this solution, Matt
... View more
04-16-2021
06:06 AM
@Vickey The file filter property of the unpackContent processor takes a java regular expression and can be used when unpacking tar or zip file. In your unpackContent processor, set the "Packaging format" to either "ZIP" or "TAR" based on what package format is used by your source file. The set a java regular expression such as below to extract only files within that package where the filename ends with the .csv, .txt, or .xml extensions: .*\.(txt|xml|csv) Hope this helps, Matt
... View more
04-12-2021
06:35 AM
1 Kudo
@ram_g With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order? If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream. Hope this helps, Matt
... View more
04-12-2021
06:16 AM
1 Kudo
@Former Member I have a HDF 3.4.1.1 cluster (Based off NiFi 1.11.4) setup and with PGs version controlled and can change processors from started to stopped to disabled without it triggering a local change. However, HDF 3.4.1.1 ships with NiFi-Registry 0.3 and not 0.8. I have another HDF 3.5.2 cluster (based off NiFi 1.12.1) and ships with NiFi-Registry 0.8. In that cluster, I can also change a processor from start to stop to disabled and it does trigger a local change. I see someone filled a Jira about this change in behavior: https://issues.apache.org/jira/browse/NIFI-8160 The tracking of Enabled and Disabled State in NiFi-Registry was added as part of: https://issues.apache.org/jira/browse/NIFI-6025 Hope this helps, Matt
... View more
04-12-2021
05:49 AM
@AnkushKoul Since the 30 seconds since last execution has past, the processor is available to be immediately scheduled once a thread becomes available. So second thread would not wait till 60 seconds. This setting is minimum wait between executions. Other factors come int play that can affect component execution scheduling. NiFi hands out threads to processors from the Max Timer Driven Thread Count resource pool set via Controller Settings under the global menu in the upper right corner. Naturally you will have more components on your canvas then the size of this resource pool (which should be set initially to only 2-4 times the number fo cores you have on a single node since setting applies per node). NiFi will hand these available threads out to processors requesting CPU time to execute. Most component threads are in the range of milliseconds of execution, bit some can be more resource intensive and take longer to complete. Before increasing this resource pool, you should monitor the CPU impact/usage with all your dataflows running. Then make small increments if resource exist. Hope this answers your questions. If, so please take. moment to accept the answer(s) that helped. Matt
... View more
04-08-2021
08:51 AM
@John_Wise @TimA Let me make sure I understand exactly what change you are making. I have Process Groups (PG) that are version controlled in my NiFi Registry. I have both a NiFi 1.11.4 and NiFi 1.12.1 clusters setup. If I import a flow from registry and then modify the state (start, stop, disable, enable) of any processor, my PGs do not change to say local changes exist. The state of a processor does not track as a local change. I suspect some other local change is being made in addition to state change. If you right click on the PG and under "Version" from displayed context menu select "show local changes" what are the tracked changes being reported? Hope this helps, Matt
... View more
04-08-2021
08:41 AM
@AnkushKoul Since you only have 1 concurrent task configured, while that concurrent task thread is in use, another thread can not be started. So even with a runs schedule of 0 secs, another task can't start until the thread tied to that concurrent task is released making it possible for another execution to happen. At 30 secs it will only be allowed to execute again 30 secs later if there is an available concurrent task not in use already on the processor. Setting 30 seconds can create an artificial delay in your dataflow when tasks takes less than 30 seconds to complete. Note: While the processor is executing a task you will see a small number displayed in the upper right corner of the processor.
... View more
04-08-2021
08:35 AM
1 Kudo
@ram_g @Magudeswaran Guaranteeing order in NiFi can be challenging. As far as the prioritizers on the connection go: FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. This looks at timestamp recorded for FlowFile when the FlowFile entered this connection. In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Typically with such processors all output FlowFiles are committed to the downstream connection at the same time which makes using this prioritizer a challenge if that is the case. But generally processors that produce multiple FlowFiles from a single FlowFile also set FlowFile attributes that identify the fragments. Take a look at the attributes written by the SplitRecord processor as an example. OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'. This looks at the FlowFile creation timestamp. In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Are all output FlowFiles created as new? Now you may want to look at the following prioritizer: PriorityAttributePrioritizer: Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. If only one has that attribute it will go first. Values for the "priority" attribute can be alphanumeric, where "a" will come before "z" and "1" before "9" If "priority" attribute cannot be parsed as a long, unicode string ordering will be used. For example: "99" and "100" will be ordered so the flowfile with "99" comes first, but "A-99" and "A-100" will sort so the flowfile with "A-100" comes first. Assuming your custom processor writes some unique attribute(s) to the FlowFiles it outputs, you may be able to use those attributes to enforce ordering downstream via above prioritizer. *** Also keep in mind that NiFi connection are "soft" limits. If your ere to set backpressure object threshold on connection outbound from your custom processor to 1 and on execution of your processor it produced 6 FlowFiles, they would all get committed to that connection. Only then does backpressure kick in and prevent your custom processor from being scheduled again until queue drops to below the backpressure threshold again. This is a good way of making sure only one "batch" of FlowFiles lands in the downstream connection at a time, but will not help enforce the order of the FlowFiles in that batch. Hope this helps, Matt
... View more
04-08-2021
08:02 AM
@AnkushKoul By only having 1 concurrent task configured, you are affectively forcing that task to complete before the next can execute. With your RunSchedule set to "30 sec" NiFi will only schedule this component to execute every 30 seconds. So if task1 takes only 20 seconds to complete, task 2 would not get started until 10 seconds later. If you set RunSchedule to default 0 secs, that tells NiFi to schedule this component to execute as often as possible. So as soon as task 1 completes task 2 will then execute. You can think of concurrent tasks as a way to parallelize execution within a single component. So instead of having two processors you have one with 2 concurrent tasks. Each task gets schedule independent (parallel) of the other concurrent task(s). Each concurrent task will work on different FlowFile(s) from inbound connection(s). Some components will not support multiple concurrent tasks (the component source code would limit it to 1) So to me it sounds like you want tasks to kick off as fast as possible one after another. IN that case leave RunSchedule at 0 secs and concurrent tasks to 1. If you found this answer addressed your question, please take a moment to accept the answer. Hope this helps, Matt
... View more