Member since
07-30-2019
3469
Posts
1641
Kudos Received
1018
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 158 | 05-06-2026 09:16 AM | |
| 247 | 05-04-2026 05:20 AM | |
| 237 | 05-01-2026 10:15 AM | |
| 468 | 03-23-2026 05:44 AM | |
| 352 | 02-18-2026 09:59 AM |
10-20-2021
11:10 AM
@RB764 Your EvaluateJsonPath processor configuration is good. This processor evaluates the json path expressions against the content of the inbound FlowFile and then with "Destination"set to "flowfile-attribute", it will create a new attribute for each dynamic property added to the processor with the value that results from the JsonPath. Your issue here is that your inbound FlowFile has no content for the EvaluateJsonPath processor to run the json path against. I see that in your screenshot of the GenerateFlowFile processor you have added a new dynamic property "value" with a value of "{"Country":"Austria","Capital":"Vienna"}". Dynamic properties become FlowFile attributes themselves on the FlowFile produced and not content. If you want to specify specific content via GenerateFlowFIle processor, you need to use the "Custom Text" property to do so: If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-20-2021
10:53 AM
@Apoo Not sure if this is the best solution, but you could use a combination of EvaluateJsonPath and ReplaceText to convert you sample source in to you sample output. EvaluateJsonPath processor: 'new"dynamic property (can use any property name) = $.data[*] this would result in this output based on your example: [{"timestamp_start":0,"timestamp_stop":0}] So we can then use the replaceText to trim off the leading "[" and trailing "]": Search Value = (^\[)|(\]$) Then you have you desired output of: {"timestamp_start":0,"timestamp_stop":0} If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
02:04 PM
@AA24 The easiest way to accomplish this is to use the PutDistributedMapCache processor in one flow to write the attributes values you want to share to a cache server and on your other flow use the FetchDistributedMapCache processor to retrieve those cached attributes and add them to your other FlowFiles that need them. Another option is to use the MergeContent processor. On flow one where it looks like you are extracting your session_id and job_id you would use the ModfiyBytes processor to zero out the content leaving you with a FlowFile that only has attributes and then use MergeContent to combine this FlowFile with the FlowFile in your second flow. In the MergeContent processor you would configure "Attribute Strategy" to use "Keep All Unique Attributes". If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
01:55 PM
1 Kudo
@TRSS_Cloudera It i snot clear to me how you have designed your dataflow to remove all files from source SFTP server except newest file? Assuming state was not an issue (since you said you flow works if you manually clear state), how do you have your flow built? There exists a GetSFTP processor that does not maintain state. So you could have your flow that uses the listSFTP and FetchSFTP to always get the newest "history" file and record that that latest "history" files last modified timestamp in something like a distributedMapCache server. Then have your GetFile run once a day using the "Cron driven" scheduling strategy to get all files (Delete Original= false)in that directory (would get latest history file also) and then get the current stored last modified time from the map cache and then via a RouteOnAttribute send any to FlowFiles where last modified stored is newer then what is on files retrieved by GetFile and finally send to a processor to remove them from source SFTP processor. While above would work in an "ideal" world. You would run in to issues when their was an interruption in the running dataflow causing multiple new files to get listed by the listSFTP processor because you would not know which one end up having its last modified timestamp stored in distributedMapCache. But in such a case the worst case if you have a couple files left lingering until the next run results in just one history file being listed and it goes back to expected. Otherwise, there are script base processor you could use to build you own scripted handling here. To be honest it seems like wasted IO to have NiFi consume these files int NiFi just to auto-terminate them when you could use an ExecuteStreamCommand processor to invoke a script that connects to your SFTP server and simply removes what you do not want without needing to pull anything across the network or write file content to NiFi that you don't need Hopefully this gives you some options to think about. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
01:18 PM
@AA24 NiFi was designed as an always on type of dataflow design. As such the NiFi processor components support "Timer Driven" and "Cron Driven" Scheduling Strategy types. That being said, the ability to tell a processor to "Run Once" exists within NiFi. You could manually do from within the UI by right clicking on the NiFi processor component and selecting "run once" from the pop-up context menu. The next thing to keep in mind is that anything that you can do via the UI, you can also do via a curl command. So it is possible to build a dataflow that could trigger the "run once" api call against the processor you want to fetch from the appropriate DB. You can not execute "run once" against a PG nor would I recommend doing so. You want to only trigger the file responsible for ingesting your data and leave all the other processor running all the time so they process whatever data they have queued at anytime. First you to create your trigger flow, so you could have a getFile to consume the trigger file and use maybe a RouteOnContent processor to send the FlowFile to either an InvokeHTTP configured to invoke run-once on your Oracle configured processor or an invokeHTTP configured to invoke run-once on your MySQL configured processor. Using your browser's developer tools is an easy way to capture the rest-api calls that are made when you manually perform them the action via the UI. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
12:04 PM
@DayDream The ExecuteStreamCommand processor executes a system level command and not something native within NiFi, so its impact on CPU is completely dependent on what the command being called is doing. You mention that the ExecuteStreamCommand is just executing a CP command and that issue happens when you are dealing with a large file. The first thing I would be looking in to is disk I/O of the source and destination directory location where the file is being copied from and copied to. You also mention that the PutFile is writing out a large FlowFile to disk. This means that the processors is reading FlowFile content from the NiFi content_repository and then writing it to some target folder location. I would once again look at the disk I/O of both locations when this is happening. The CPU usage may be high simply because these threads are running a long time waiting on disk I/O. NiFi uses CPU for its core level functions and then you configure an additional thread pool that is used by the NiFi components you add to the NiFi canvas. This resource pool is configured via NiFi UI --> Global Menu (upper right corner of UI) --> Controller Settings: The "Event Driven" thread pool is experimental and deprecated and is used by processors configured to use the event driven scheduling strategy. Stay away from this scheduling strategy. The "Timer Driven" thread pool is used by controller services, reporting tasks, processors, etc... The Processors will use it when configured to use the "Timer Driven" or "Cron driven" scheduling strategies. This pool is what is available for the NiFi controller to hand out to all processors requesting time to execute. Setting this value to an arbitrarily high value will simply lead to many NiFi components getting threads to execute but then spending excessive time in CPU wait as the time on the limited cores is time sliced across all active threads. The general rule of thumb here is to set the pool to 2 to 4 times the number of available core on a single NiFi host/node. So for your 8 core server, you would want this between 16 and 32. This does not mean you can't set this higher, but should only do this in smaller increments while monitoring CPU usage over extended period of time. If you have 5 nodes, this setting is per node so you would have a thread pool of 16 - 32 on each NiFi host/node. Another thing you may want to start looking at is the GC stats for your JVM. Is GC (young and old) running very often? Is it taking a long tome to run? All GC is a stop-the-world event, so the JVM simply is paused while this is going on which can also impact how long a thread is "running". You can get some interesting details about your running NiFi using the built in NiFi diagnostics tool. <path to NiFi>/bin/nifi.sh diagnostics --verbose <path/filename where output should be written> For a NiFi node to remain connected to it must be successful at sending a heartbeat to the elected cluster coordinator at least 1 out of 8 scheduled heartbeat intervals. Let's say the heartbeat interval is configured in the nifi.properties file for 5 secs, then the elected CC must successfully process at least 1 heartbeat every 40 secs or that node would get disconnected for lack of heartbeat. The node would initiate a reconnection once a heartbeat is received after having been disconnected for above reason. Configuring a larger heartbeat interval will help avoid this disconnect/reconnect by allowing from time before heartbeat is considered lost. This would allow more time if the node is going through a long GC pause or the CPU is so saturated it can't get a thread to create a heartbeat. I also recommend reading through this community article: https://community.cloudera.com/t5/Community-Articles/HDF-CFM-NIFI-Best-practices-for-setting-up-a-high/ta-p/244999 If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
11:32 AM
@vikrant_kumar24 The ExecuteScript processor has been around for over 6 years as part of Apache NiFi. It has had many improvements and bug fixes over those years just like many other well used components. I'd be reluctant from calling it "experimental" any longer regardless of what the embedded Apache NiFi docs say. The only thing to note here is that the ExecuteScript processor does not really execute the "Python" script engine. It is executing "Jython" instead which is a Java implementation of Python. Jython is not 100% compatible with Python, so you must test you script thoroughly. Thanks, Matt
... View more
10-08-2021
11:06 AM
@Ankit13 How do you know no more files we will be put after the NiFi flow processing starts? To me in sound like the PutFile should execute at default 0 secs (as fast at it can run) and you should instead control this dataflow at the beginning were you consume the data. For example: In a 24 hour window data is being written to source directory to be consumed from between 00:00:00 and 16:00:00. Then you want to write that data to target directory starting at 17:00. So you instead setup a cron on a listFile processor to consume list the files at 17:00 and 17:01 and then have a FetchFile and PutFile running all the time so these immediately consume all the content for the listed files and write them to target directory. Then your listFile does not execute again until same time next day or whatever you cron is. This way the files are all listed at same time and the putFile can execute for as long as needed to write all those files to the target directory. Hope this helps, Matt
... View more
10-07-2021
02:40 PM
@CodeLa @SAMSAL I want to point out that tracking timestamps will not always guarantee NiFi will consume all files from the input file directory depending on how they are being placed in that directory. The ListFile processor looks at the last modified timestamp on the file. It then lists all files since the last recorded timestamp stored in NiFi state manager from the previous processor execution. On first run their will be no state and this everything currently is listed. Now consider the scenarios below which can affect above from listing all files: The mechanism that is writing the files to that inout directory is not updating the last modified timestamp on the file once it is done writing to it. Let say we have file 1 that starts being written to as 12:00:01.000 and file 2 that starts being written as 12:00:01.300. File 2 completes first and is consumed by listFile and stored state is updated to reflect 12:00:01.300. Now File 1 completes, but is never consumed by ListFile since its last modified timestamp is older than file 2. If you are in such a scenario, the ListFile offers a different "Listing Strategy" called "Tracking Entities" which tracks filenames as well in a cache service which allows it to still list files that may have an older timestamp. Another thing to consider is listFile may list the same file more than once. Consider this scenario: You tell NiFi ListFile to list files from directory /nifi/myfiles/. The mechanism writing these files to the target directory does update the last modified timestamp as file is being written, but does not use a ".<filename>" (dot rename) approach to writing these files (means file is initially a hidden file until file write completes and then is renamed and made unhidden. Default listFile config ignores hidden files). So when ListFile runs, it sees that file with newer last modified timestamp and lists it. Then on next execution it sees same file again because its last modified timestamp is updated as file is still being written to. If you are in such a scenario, you would want to make use of the "Minimum File Age" property. This property tells the listFile to ignore any files were the last modified time stamp when compared to current time is not at least that configured amount of time old (that means last modified timestamp has not changed for configured amount of time). That configured time is arbitrary and what ever length is needed for you to be confident file write was complete. Something else you need to consider depends on if both the following are true: 1. You are using a multi node NiFi cluster 2. The configured directory you are listing from is mounted to every node. Since every node in a NiFi cluster is executing the same dataflow, you want to avoid every node from listing the same files. IN this scenario you would change the "Execution" configuration from "All nodes" to "Primary" on the ListFile and change "input Directory location" from "local" to "remote". Then you will want to set "load balance Strategy" to "Round Robin" on the connection between ListFile and FetchFile. NOTE: Never set the Execution on any processor that has an inbound connection to "Primary node". ONLY processor with not inbound connection should be considered for this execution configuration. I know this is a lot to digest, but very important to be aware of to ensure success. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-07-2021
02:06 PM
@Ronman I don't know anything about the cetic/helm-nifi image. I assume you are talking about this: https://github.com/cetic/helm-nifi I started parsing through what is in the above github and the authorizers.xml that is built looks poorly done. Can you share what is in your authorizers.xml file on your NiFi host? The image looks to create numerous providers that are not actually used from what I can tell. Looks like it creates: 1. file-user-group-provider 2. ldap-user-group-provider (only if ldap enabled) 3. composite-configurable-user-group-provider (Only if ldap enabled) 4. file-access-policy-provider. (Always points at file-user-user-group-provider which means 2 and 3 would never get used even if they were created) 5. managed-authorizer (points at file-access-policy-provider) 6. file-provider (only if LDAP enabled. This is a legacy provider and not sure why anyone would still use it. It can reference and of the above user-group providers) So seeing what is actually written to that file might be helpful here. Also on startup the Authorizers.xml is responsible for seeding some initial polciy for the admin user in the users.xml and authorizations.xml files. This would including the intial set of policies for the root PG. This will not happen if upon first launch of NiFi there was not flow.xml.gz yet and thus no flow.xml.gz containing the root PG UUID yet. So you may want to rename your existing authorizations.xml file and restart your NiFi so that a new one is generated since you have a flow.xml.gz now and see if that gives you the policies you need to start editing the canvas. But even if above works, it still think you have an issue within your authorizers.xml files configuration. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more