Member since
07-30-2019
3398
Posts
1621
Kudos Received
1001
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 481 | 11-05-2025 11:01 AM | |
| 370 | 11-05-2025 08:01 AM | |
| 593 | 11-04-2025 10:16 AM | |
| 731 | 10-20-2025 06:29 AM | |
| 871 | 10-10-2025 08:03 AM |
10-24-2018
02:43 PM
I'm starting to trial NiFi as a data ingestion engine. I would like to input the following datatypes: 1) collectd (UDP): I don't think that NiFi has a collectd parser, so I will need to direct these raw UDP streams to a locally running Telegraf and Logstash instance for parsing. 2) Syslog (UDP): I would like to experiment with routing raw syslog UDP packets (unprocessed) to destinations as well as filtering/parsing the syslog data using the NiFi syslog modules. 3) Netflow (UDP): I would like to take a heavy raw Netflow stream and test performance to forward only a subset of Netflow data based on a list of protocol types that I'm interested in (mapped against one of the Netflow data key values). What I haven't been able to understand in the documentation is how I can redirect raw UDP packet flows, (listening and then forwarding to two destinations), without having to process the particular data in the UDP packet.
... View more
10-12-2018
12:01 PM
@David Sargrad - NiFi is designed to prevent data loss. This means that NiFi needs to do something with NiFi FlowFiles when the processing of that FlowFile encounters a failure somewhere within a dataflow. - When in comes to ingest type processors like GetHTTP, a FlowFile is only generated upon success. As such, there is no FlowFile created during failure that would need to be handled/routed to some failure relationship. - Upon next scheduled run, the getHTTP processor will simply try to execute just like it did on previous run. If successful, a FlowFile will be produced and routed to the outbound success relationship connection. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
10-12-2018
01:03 PM
Thank you. I like your answer very much. I do think the referenced example was not focused on a zip of zip (just a simple zip of a directory tree).. Yet I think your answer is proper.. The "path" attribute does the job. I'll try this.. and thanks.
... View more
10-15-2018
01:37 PM
@Matt Clarke Hi Matt. One question I'd like to get your perspective on.. Assuming that I manage independent flows within a single NIFI cluster, is it your experience that I can use the NIFI registry to properly version and manage the independent flows that are processed within that cluster? The inability to manage each independent flow as a versioned flow, could potentially drive me to using multiple NIFI clusters (and to assign each cluster one or several flows). I'm concerned about the overall complexity of the processing that is assigned to a single cluster.
... View more
10-01-2018
05:52 PM
@yazeed salem Your NiFi expression Language statement looks good. I even tested base on your example and it routed my flowfiles correctly Make sure that each of the FlowFiles being processed have required FlowFile Attributes set on them. You can stop your RouteOnAttribute processor and allow a few files to queue in the connection feeding it. Then right click on connection and select "List queue". You can then click on "details" icon to far left of any FlowFIle to verify that it does have correct attributes set on it. - - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
09-18-2018
12:51 PM
1 Kudo
@Wojtek I believe you are misunderstanding how the UpdateAttribute processor functions. - Each new property you add expects a property name (this becomes name of attribute being created or updated) and a value. The value can be a string or a NiFi Expression Language (EL) statement. - In your screenshot above you have created EL statements. For example Property = bytes Value = ${bytes} What the above will actually do is: - The EL statement "${bytes}" tells NiFi to try to locate an NiFi attribute (At no time does the updateAttribute processor read the content of the FlowFile) with a property name of bytes and return its assigned value. That returned value will then be used to change the existing value assigned to the FlowFile attribute "bytes" or create a new FlowFile attribute with property name "bytes" and assign the value to that. - NiFi searches for NiFi Attributes in the following hierarchy: 1. NiFi checks all existing attributes assigned already to FlowFile being processed. 2. NiFi checks all in scope process group variables 3. NiFi checks the NIFi variable registry file 4. NiFi checks the NiFi JVM properties 5. NiFi checks the NIFi user system environment variables. - Since the attribute "bytes" does not exsit in any of these places you are ending up with no value or empty string values being set on all these new properties. - Since you are trying to extract values from the content and assign those to FlowFile attributes, you will want to use a different processor. Perhaps ExtractText instead. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
09-12-2018
12:50 PM
4 Kudos
# Max Timer Driven Thread Count and Max Event Driven Thread Count: - Out of the box NiFi sets the Max Timer Thread Counts relatively low to support operating on the simplest of hardware. This default setting can limit the performance of very large high volume dataflow that must perform a lot of concurrent processing. General guidance for setting this value is 2 - 4 times the number of cores available to the hardware on which the NiFi service is running. With a NiFi cluster where each server has different hardware (not recommended), this would be set to the highest value possible based on the server with the fewest cores. NOTE: Remember that all configurations you apply within the NIFi UI are applied to every node in a NiFi cluster. None of the settings apply as a total to the cluster itself. NOTE: The cluster UI can be used to see how the total active threads are being used per node. Closely monitoring system CPU usage over time on each of your cluster nodes will help you identify regular or routine spikes in usage. This information will help you identify if you can increase the “Maximum Timer Driven Thread Count” setting even higher. Just arbitrarily setting this value higher can lead to thread spending excessive time in CPU wait and not really doing any work. This can show as long tasks times reported in processors. - *** The Event Driven scheduling strategy is considered experimental and thus do not recommend that it is used at all. User should only be configuring their NiFi processors to use one of the Timer Driven scheduling strategies (Timer Driven or CRON Driven). - # Assigning Concurrent Tasks to processor components: - Concurrent task settings on processors should always start at default 1 and only be incremented slowly as needed. Assigning too many concurrent tasks to every processor can have an affect on other dataflows/processors. - Because of how the above works, it may appear to a user that they get better performance out of a processor by simply setting a high number of concurrent tasks. What they are really doing is simply stacking up more request in that large queue so the processor gets more opportunity to grab one of the available threads from the resource pool. What often happens is users with a processor only running with 1 concurrent task are affected (simply because of the size of the request queue). So that user increases their concurrent tasks. Before you know it the request queue is so large, no one is benefiting from assigning additional concurrent tasks. - In addition you may have processors that inherently have long running tasks. Assigning these processors lots of concurrent tasks can mean a substantial chunk of that thread pool is being used for an extended amount of time. This then limits the number of available threads from the pool that are trying to work through the remaining tasks in the queue.
... View more
Labels:
09-12-2018
12:30 PM
6 Kudos
# Processor Run Duration: Some processors support configuring a run duration. This setting tells a processor to continue to use the same task to work on as many FlowFiles (or batches of flowfiles) from an incoming queue in a single task. This is ideal for processors where the individual tasks themselves are completed very fast and the volume of FlowFile are large as well. In the above example, the exact same feed of FlowFiles were passed to both these processors which are configured to perform the same Attribute updated. Both processed the same number of FlowFiles in the past 5 minutes; however, the processor configured with a run duration consumed less overall CPU time to do so. Not all processors support setting a run duration. The nature of the processor function, the methods being used, and/or client lib used may not support this capability. You will not be able to set a run duration on such processors. How this works: Processor has thread assigned to its task. Processor grabs highest priority FlowFile (or batch of FlowFiles) from the “active queue” of the incoming connection. If processing of the FlowFile(s) does not exceed the configured run duration, another FlowFile (Flowfile batch) is pulled from the active queue. This process continues all under that same thread until run duration has been reached or “Active queue” is empty. At that time the session is completed and all outbound FlowFiles are committed at once to the appropriate relationship. Since no FlowFiles are committed until the entire run completes, Some latency is introduced on the FlowFiles. Your configured run duration dictates how much latency will occur at a minimum. If the execution of the processor against a FlowFile takes longer then the configured "run duration", there is no added benefit of adjusting this configuration. What this means for heap usage: Since it is only processing incoming FlowFiles in the “Active queue” there is no added heap pressure here. (FlowFiles in “active queue “ are already in heap space). The FlowFiles being generated (if any, depending on processor function) are all held in heap until the final commit. This may introduce some additional heap pressure versus not using a run duration since all those new FlowFiles being generated will be held in heap until they are all commited to an output relatiosnhip at the end of the run duration.
... View more
Labels:
09-14-2018
01:47 PM
@sri chaturvedi yes, potentially if there are enough inbound FlowFiles to trigger processor to run 4 times concurrently.
... View more