Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 134 | 12-17-2025 05:55 AM | |
| 195 | 12-15-2025 01:29 PM | |
| 133 | 12-15-2025 06:50 AM | |
| 260 | 12-05-2025 08:25 AM | |
| 418 | 12-03-2025 10:21 AM |
05-07-2019
01:03 PM
@BVS REDDY NiFi does not produce the alert message you are showing above. Where did you see this alert displayed? All NiFi's logging is controlled by the logback.xml file found in NiFi's conf directory. There are three default logs that NiFi creates and writes log entries. nifi-bootstrap.log <-- Should be relatively small by default and log info related to NiFi bootstrap process. nifi-user.log <-- Only used when NiFi has been secured. This log contains logging about user/client authentication and authorizations requests. nifi-app.log . <-- These logs can become very large depending on a couple factors. The number of components (processors, controller services, reporting tasks, etc.) added to NiFi canvas. The volume of FlowFiles passing through components. By default the log level is set to "INFO" in older NiFi releases which results in above mention potentially large output. The newest version of added a new logger to the logback to set processor specific logging to "WARN" log level. NiFi I would suggest editing your default NiFi logback.xml as follows: Within the "appender" for the nifi-app.log add a ".gz" to the ned of the "fileNamePattern" line so that rolled logs are compressed to reduce disk usage. Check for existence of this logger "<logger name="org.apache.nifi.processors" level="WARN"/>" If it exists, make sure it is still set to "WARN" If it does not exist, add it. Tail your nifi-app.log file (tail -F <path to NiFi logs>/logs/nifi-app.log) and see if your NiFi flows are producing a lot of WARN or ERROR level log outputs. If so, address those ERRORs and WARNs to reduce amount of logging that is occurring. Thank you, Matt If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
05-06-2019
07:08 PM
@3nomis The Wait and Notify processor have nothing to do with the merging of FlowFiles. The Wait processor prevents FlowFiles from passing through it until a release signal for the FlowFile is found in the configured cache service. The Wait processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique and shared by only the corresponding FlowFile that will go to the Notify processor. The wait processor then check the configured cache service for the existence of a cache key that matches that value. The Notify processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique to that FlowFile and to the corresponding FlowFile currently waiting at the Wait processor. The Notify processor will create the cache key entry if it does not exist in the configured cache service. JSON content similar to the following will be generated and assign to the that cache key: {"counts":{"counter":1},"attributes":{},"releasableCount":0} If another FlowFile will same cache key value comes to the Notify processor, the "counter" is incremented. When the Wait processor finally sees that the cache key for a waiting FlowFile contains this JSON, it will decrement the count value by the configured signal count and move resulting number from counter to "releasableCount". If releasableCount equates to 0, the cache key is deleted; otherwise, the decremented value is set to the positive new decremented value. Additional FlowFiles looking at this same cache key will be able to pass through Wait until counter and releasableCount are both zero. Remember that Notify will increment these counts by one for each FlowFile that "notifies" using same cache key value. The design intent here is to hold processing on some source file until some side processing is complete. In your use case I do not believe this is what you are trying to do. You are just looking for a way to merge all your splits back in to the same original zipped FlowFile post flattening the json in each split. In this case, you may want to add and updateAttribute processor between your two split based processors. Following the split of a FlowFile each produced split FlowFiles has these three FlowFile Attributes assigned: fragment.identifier All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute fragment.index A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile fragment.count The number of split FlowFiles generated from the parent FlowFile Using the UpdateAttribute Processor, you can create a new set of attributes to save these values so they do not get lost when you split again. for example: fragment.identifier.first.split = ${fragment.identifier}
fragment.index.first.split = ${fragment.index}
fragment.count.first.split = ${fragment.count} Then pass the first set of splits to next split processor. Each produced FlowFile will get new fragment values but retain the above first.split properties. Then perform your processing to flatten each of your resulting FlowFile's json content. Then we are on to the Merge process where we merge twice. First merge based on the fragment.identifier, fragement.index, and fragment.count assigned by second split. Following the merge use an UpdateAttribute processor to move the *.first.spit fragment values back to original the corresponding fragment.identifier, fragement.index, and fragment.count. Now you can merge these FlowFiles back to original FlowFile. Thank you, Matt If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
05-03-2019
04:35 PM
1 Kudo
@c i *** Community Forum Tip: Try to avoid starting a new answer in response to an existing answer. Instead use comments to respond to existing answers. There is no guaranteed order to different answer which can make it hard following a discussion. *** Community Forum Tip: If you want user to be notified about your response, try using the <@name> tag on your response. 1. I believe this approach is more robust than using Max Bin Age/max latency, what do you think? -- I agree this is more robust. Also the fact that by adding the UpdateAttribute between your MergeContent and the two ReplaceText processors you have only one inbound connection to your MergeContent processor. On each execution of the MergeContent processor it will bin FlowFiles from only one inbound connection. It then round robins all inbound connection. Having only on inbound connection will improve efficiency. Without the updateAttribute processor I would have suggested using a "Funnel" to reduce the two connections to just one. 2. would you suggest then to set the "Execution" of MergeContent to Primary node instead of All nodes? --> NO processor that has inbound connections to it should ever be configured for "Primary node" only execution. The elected "Primary node" in a NiFi cluster can change at any time which could lead to FlowFiles queued in connections to processors that are no longer scheduled to run on the previously elected primary node. I do see however that your GenerateFlowFile processors are not configured for "Primary node" execution. This means every time they are scheduled to run every node in your NiFi cluster is going to run that processor and produce FlowFile(s). So if intent is to produce only 1 FlowFile per execution, you would want those generateFlowFiles processors running on primary node only. In other scenarios where your Source data comes in to NiFi via other means that may be distributed across all your NiFi nodes, you can use the load-balanced capability available on connections to redistribute those FlowFiles in a variety of ways. One of the methods is "Single node" which is great when you need to get all queued FlowFiles moved over to a single node's mergeContent processor. Thank you, Matt If you found the assistance I provided via this answer addressed your question, please take a moment to login in and click the "ACCEPT" link below my answer.
... View more
05-03-2019
04:14 PM
1 Kudo
@Ramesh Reddy When you build a dataflow on your NiFi UI's canvas and then save it as a template via "Create Template", all selected components (including all current configuration minus any passwords) will be saved into a template. The template is a just an xml snippet of that current configuration of those components which is kept in the NiFi's flow.xml.gz file. From the the "Templates" UI found under NiFi's global menu (upper right corner of NiFi UI) you can select any template and download that xml snippet to disk. Those snippets can then be uploaded to another NiFi for use there. The actual components on the canvas are in no way linked directly to any created template. So when you make changes to the components on the canvas, those changes are not pushed to any template you already created. You can create a new template anytime you want after making changes. I believe what you really want to be using is the NiFi-Registry service. This is a separate install from NiFi. NiFi can be configured to use the NiFi-Registry service to "Version Control" dataflows you build within the NiFi UI's canvas. Once a NiFi registry has been configured for your NiFi to use, you will have the option to right click on a NiFi Process Group (PG) and select "Start version control" from the version found in the presented context menu that is displayed. Once a PG is placed under version control, any changes made within the PG will trigger the PG to display an icon in upper left corner: informing you that local changes have been made. Right clicking on the PG again will allow you to commit those local changes as a new "version" of that dataflow in the NiFi-Registry. Other NiFi installs can be configured to use that same NiFi-Registry which allows those other NiFi instances to load a version controlled PG on to the canvas on that NiFi as well. A version controlled PG where the local flow matches teh current version in teh NiFi_Registry will display this icon in upper left corner of the PG: If a new version of a version controlled PG is pushed to the NiFi-Registry, any other NiFi instances using that version controlled flow will be notified that a newer version is available in the NiFi-Registry. User can then upgrade their PG to teh newer version. Thank you, Matt
... View more
05-03-2019
12:18 PM
1 Kudo
@c i I am not clear from your description what your flow is doing. - What does the original FlowFile content look like before these replaceText processors? - How are the replaceText processors configured? The description you have of how the mergeContent processor works is not accurate. The MergeContent processor will merge a bin when any ONE of the following as occurred: 1. A given bin meets the configured minimum configured values for "number of entries" and "group size" (Bin-Packing Algorithm Merge Strategy) 2. A given bin contains all fragments of a fragmented FlowFile (Defragment Merge Strategy) 3. A bin has reached the configured "max bin age". Bin age starts when first FlowFile is allocated to a bin. The max bing age property exists to keep bins from staying around forever when the conditions outlined in 1 and 2 above are never met. 4. Processor runs out of bins. If FlowFiles have been allocated to each available bin already and a new FlowFile does not meet criteria to be addd to one of these existing bins, the oldest bin will be merged in order to free a bin to use for this new FlowFile. (This typically occurs when using the Correlation Attribute Name property and the source FlowFiles have more unique attribute values than bins allocated). The MergeContent processor will not always wait the configured max bin age before merging. Also keep in mind that with a NiFi cluster, each NiFi node is running the MergeContent processor independently of the other nodes and can only merge FlowFiles on the same node. So while an inbound connection queue to the MergeContent processor may show 2 queued FlowFiles, each of those FlowFiles may exists on different nodes and thus would not be merged together in to one FlowFile. Consider the "Max bin age" as the max latency you are willing to allow in your flow. So that a bin is merged even if it does not meet min criteria in the allotted time. So even if you set this to 30 minutes, a bin that meets the min criteria in 2 mins will get merged at 2 minutes. More details about your overall dataflow is probably needed to offer alternative suggestions here. Thank you, Matt
... View more
05-02-2019
03:25 PM
1 Kudo
@Nick Stantzos You can ignore the unexpected coloring rendered by the NiFi Expression language editor window. The java regex replace should work just using "# DOG CAT BIRD" or "\# DOG CAT BIRD". Thank you, Matt
... View more
04-23-2019
03:39 PM
@Sebastian Carroll There are other things that also exist in heap memory space within the NiFi JVM: Component Status History: NiFi will store status history data points for all processors on the NiFi canvas (including those that are stopped). You can see this stored status history by right clicking on a component and selecting "view status history". Each component has numerous stats for which these data points are retained. All these component status points are stored in heap memory. The number of points per each stat that is held in heap is controlled in the nifi.properties file: nifi.components.status.repository.buffer.size --> Specifies the buffer size for the Component Status Repository. The default value is 1440. nifi.components.status.snapshot.frequency --> This value indicates how often to present a snapshot of the components' status history. The default value is 1 min. so on every restart of NiFi these stats are gone since they are in heap only. Then over the course of default 24 hours (1440 minutes in 24 hours) the heap usage grows. You can reduce the heap usage by this status history by adjusting the above properties. (take snapshots less frequently, perhaps every 5 minutes. Reduce number of data points retained from 1440 to 380 or lower.) Templates: All uploaded templates (whether they are instantiated to canvas or not) are held in heap memory. you can reduce heap memory usage by deleting uploaded templates you will no longer be instantiating to canvas. Dataflow: Your entire flow is held in heap memory. The more components you have on the canvas the large heap footprint. Queued FlowFiles: Even if no processors are running, the FlowFile attributes for FlowFiles loaded in to each connection between processors are held in heap memory. (There is a swap threshold configurable in nifi.properties file which triggers a connection to start swapping to disk if number fo queued FlowFiles exceeds the configured swap threshold) Thank you, Matt
... View more
04-12-2019
03:24 PM
@Raj Negi The Process Group variables are not dynamic. Anytime the variable is changed it requires a restart of any component that references it. That is because those components read in that variable value when they are started in a lot of cases. - Your first dataflow is responsible executing an InvokeHTTP processor to retrieve the token needed for all the different dataflows within the same Process Group. You may consider adding a PutDistributedMapCache processor in that flow to write that retrieved token out to a cache server. In each of your other dataflows you could have a FetchDistributedMapCache processor that pulls that token from the cache server so it is is added to every FlowFile. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
04-11-2019
10:18 AM
1 Kudo
@Abhinav Joshi I suspect that you still have multiple versions of the updateAttribute loaded in your new NiFi 1.9.0 install. My guess here is that the updateAttribute-1.4.nar was added to your NIFi 1.8 and NiFi 1.9 as an additional nar manually. Check your nifi.properties file in NiFi 1.9.0 for the every occurrence of "nifi.nar.library.directory". You may find 1 or more. These will be the locations from which your NiFi is loading its nars. Then look in these directories for "nifi-update-attribute-nar" to see if you find multiple versions in use. I am guessing that in your NiFi 1.9.0 you will find a 1.9.0 version of this nar and a 1.4.0 version. - So what happens when the flow.xml.gz file you are using from NiFi 1.8.0 is loaded, you have multiple versions of the updateAttribute processor (1.4 and 1.8). The 1.4 versions load just fine because the 1.4.0 nar still exists; however, there is no 1.8.0 version of the nar. Normally NiFi would auto select the available option, but since there are two options available (1.4 and 1.9 now), NiFi cannot auto select because two options exist and ghost implementation of the 1.8 versions are created requiring the user to manually replace them with the desired version. - So you have two options: 1. Remove the 1.4 nar so only 1.9 version of the nar exists 2. Manually edit the flow.xml.gz file replacing the version number on the 1.8 updateAttribute processors with the new 1.9 version number. - If you did not find multiple nars, then perhaps you are new 1.9 install is using the same old work directory as the original 1.8. NiFi unpacks the nars in the work directory on startup. IN this scenario simply delete everything in the work directory before starting NiFi so it is rebuild form the nars/jars found in the above mention library directories. - Thanks, Matt
... View more
04-10-2019
10:10 PM
@Julian Iglesias - You can accomplish above using a function chain in your NiFi Expression Language (EL) statement. Rather then using the getDelimitedField() EL function: ${log_source:getDelimitedField(3,'\')} - You can successfully do this using the substringAfter() function twice to strip away what is before "dir2" and the substringBefore() function to strip away everything after "dir2": ${log_source:substringAfter('\\'):substringAfter('\\'):substringBefore('\\')} - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more