Member since
07-30-2019
2901
Posts
1438
Kudos Received
843
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
17 | 04-17-2024 11:30 AM | |
24 | 04-15-2024 05:31 AM | |
118 | 04-03-2024 05:59 AM | |
127 | 04-02-2024 01:22 PM | |
150 | 04-02-2024 06:16 AM |
11-08-2022
11:43 AM
1 Kudo
@Bridewin I two things you may want to try.... 1. GetFile processor was deprecated in favor of the newer ListFile --> FetchFile processors. I'd recommend switching to these processors and see if you have the same observations. 2. I'd suggest enabling debug logging for the GetFile processor class to see what additional logging may show. To do this, you would modify the logback.xml file in NiFi's conf directory. Add the below line down in this file where you see similar lines already. <logger name="org.apache.nifi.processors.standard.GetFile" level="DEBUG"/> If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-08-2022
11:34 AM
@Jagapriyan I suspect issue with last modified timestamps since missed files have older last modified timestamp than what was already consumed from the target directory that is compounded by the sub-directory structure. My recommendation is switch to using the listing strategy "Tracking Entities" instead. Tracking Entities will keep track of filenames and timestamps so even an older timestamped file will get consumed if its filename is not in the tracked entities list stored in the distributed cache. Let me know is making this change resolves yoru issue. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-02-2022
10:28 AM
@Bridewin To add some additional context around your Cron schedule. NiFi uses Quartz Cron in case you were not already aware. Your current Quartz cron "0 05 8 1/1 * ? *" means that the processor will be scheduled to execute at 8:05am starting day 1 of every month and every subsequent day after day 1 in each month. The issue with this cron is when you start your GetFile on any day other than the 1st prior to 8:05am. Let's say you start NiFi on November 3rd. On startup NiFi loads your flow and starts all your component processors. In this configuration your GetFile will not get scheduled until December 1st and then at that point continue to execute everyday there after. If you stop and start the processor even without a NiFi restart, the same would happen. If NiFi restarts the JVM, same will happen. I am not clear on why you decided to add 1/1, perhaps this is how you intended for it to be scheduled? To truly have it get scheduled at 8:05am everyday starting the very day the processor is started (whether via user action or NiFi JVM restart), you would want a cron like "0 5 8 * * ? *" For more info on QuartZ Cron, review this link: https://productresources.collibra.com/docs/collibra/latest/Content/Cron/co_quartz-cron-syntax.htm If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-01-2022
12:36 PM
@Jagapriyan Since you are using the Listing Strategy "Tracking Timestamps", the configuration property "Entity Tracking Time Window" is not used. The "Tracking Timestamps" strategy is very dependent on timestamps of the target files. Typically when files are not being picked up it is because the timestamps on those files are equal to or less than the last recorded timestamp in the ListSFTP processors state. This can happen when files in the SFTP server target folders do not have their last modified timestamp updated (for example moving a file from another directory into a SFTP server directory. A copy would update the timestamp since the file is being written again). - Does your target SFTP path have multiple sub-directories which are being searched? Is Search Recursively set to "true"? - Are there symlink directories in use? - Have you looked the the state recorded timestamp for your SFTP server directories? Are your missed files having older timestamps? - How many files average are being written to the target SFTP between 12am and 1am each day? I also see you have min file age of 5 minutes. This means the last Modified timestamp must be 5 minutes older than the execution time of your processor for the file to be eligible for consumption. I see you stated your files are placed in the SFTP server between 12am - 1am each day and you scheduled your ListSFTP processor using a cron schedule at 10 minutes and 1 second every hour between 2am and 2pm. Why not just have your listSFTP processor run all the time? Is this because timestamps are not being updated consistently? If you switch to using the listing strategy "Tracking Entities" instead, do you still see the issue? Tracking entities works when there is issues with timestamps and was developed for that reason. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-01-2022
12:02 PM
@Bridewin Are all your environments using NAS storage location from which the GetFile is pulling files? Have you monitored the health and connectivity of your NAS. Since you have your GetFile only scheduled to execute once a day, if your NAS or network is having issues, it simply will return nothing for that days execution. Since you are configured to remove the file you are consuming, have you tried to change yoru cron to run multiple times within the 8am hour to see if it gets picked up by any one of those executions? Perhaps if you are having network issues occasionally impacting your NAS, this will resolve your issue with consuming the file. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-28-2022
01:06 PM
@D5ha Not all processors write to the content repository nor is content of a FlowFile ever modified in the content after it is created. Once a FlowFile is created in NiFi it exists as is until terminated. A NiFi FlowFile consists of two parts, FlowFile Attributes (metatadata about the FlowFile which includes details about the FlowFIle's content location in the content_repository) and the FlowFile content itself. When a downstream processor modifies the content of a FlowFile, what is really happening is a new content is written to a new content claim in the content_repository, the original content still remains unchanged. From what you shared, you appear to have just one content_repository. Within that single content_repository, NiFi creates a bunch of sub-directories. NiFi does this because of the massive number of content claims a user's dataflow(s) may hold for better indexing and seeking. What is very important to also understand is that a content claim in the content_repository can hold the content for 1 or more FlowFiles. It is not always one content claim per FlowFiles content. It is also very possible to have multiple queued FlowFiles pointing to the exact same content claim and offset (same exact content). This happens when you dataflow clones a FlowFile (for example routing same outbound relationship from a processor multiple times). So you should never manually delete claims from any content repository as you may delete content for multiple FlowFiles. That being said, you can use data provenance to locate the content_repository (container), subdirectory (section), Content Claim filename(Identifier), Content offset byte where content begins in that claim (Offset), and number of bytes from offset to end of content in the claim (Size). Right click on a processor and select "view data provenance" from displayed context menu: This will list all FlowFiles for which provenance still holds index data on that were processed by this processor: Click the Show Lineage icon (looks like 3 connected circles) to the far right of a FlowFile. You can right click on "clone" and "join" events to find/expand any parent flowfiles in the lineage (the event dot created for the processor on which you said show provenance will be colored red in the lineage graph): Each white circle is a different FlowFile. clicking on a white circle will highlight dataflow path for that FlowFile. Right clicking on an event like "create" and selecting "view details" will tell you all about what is known about that FlowFile (this includes a tab about the "content"): Container corresponds to the following property in the nifi.properties file: nifi.content.repository.directory.default= Section corresponds to subdirectory within the above content repository path. Identifier is the content claim filename. Offset is the byte on which content for this FlowFile begins within that identifier. Size is number of bytes of you reach end of content for that FlowFile's content in the Identifier. I also created an article on how to index the Content Identifier. Indexing a field allows you to locate a content claim and the search for it in your data provenance to find all FlowFile(s) that pointed at it. You can then look view the details of all those FlowFile(s) to see full content calim details as above: https://community.cloudera.com/t5/Community-Articles/How-to-determine-which-FlowFiles-are-associated-to-the-same/ta-p/249185 If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-25-2022
07:29 AM
@PepeClaro While NiFi supports parallel thread execution, there is no way to guarantee that two threads execute at the exact same time. So one NiFi component processor is unaware of what another NiFi Component processor is doing or when it is executing. Processors that have an inbound connection on them use an inbound connection queued FlowFile as the trigger to perform start execution. Step 1 is to identify what NiFi component processors can be used to perform/execute your 3 processes: https://nifi.apache.org/docs.html I have no idea form your description what your 3 processes do, so I can't make any recommendations on what you can/should use. Step 2 is deciding how to interconnect these NiFi processor component and preserve data needed for downstream dataflow processing in your third process. When a processor executes the response/return from the execution can result in modification to existing NiFi FlowFile's content, Creation of new FlowFile content, Creation of an entirely new FlowFile, Creation of new FlowFile attributes (key/value pairs), modification of FlowFile attributes, or none of the above depending on the NiFi component processor being used. Since you mention that first 2 processes get info that is needed by process 3, so would need to take that into consideration for process 3. Where is that info go ing to end up (FlowFile Content or FlowFile attributes)? How large is that info returned (does it make sense to put it in to attribute)? Does that returned info need to be modified in any way before process 3? In your Flow as described, you have two Process Groups (PG), Each of these PGs performs your process 1 and process 2. Each will be executing independent of the other and thus can not guarantee execution at the exact same time. Cron scheduling of a processor can give a better chance of same time execution but still not a guarantee since it only schedules when to request an available thread from the NiFi Max Timer Driven Thread pool. If at time of request all threads are in use, it will execute as soon as thread becomes available. Now out of these two PGs you will have two FlowFiles that your third process depends on. There is no way to tell a NiFi processor component to pull attributes or content from two different FlowFiles source FlowFiles. So before process 3 you need to combine any needed attributes and or content from the two original FlowFiles into one FlowFile that Process 3 can use. Hard to make a recommendation here since I don't know any details about your 3 processes, what the FlowFiles that are produced by Process 1 and 2 contain in terms of content and attributes, and what content and/or attributes from process 1 and 2 are needed by process 3. I made suggestion about maybe being able to use the "defragment" merge strategy from the MergeContent processor to combine the FlowFiles from process 1 and process 2, but not enough detail to say or say without needing to do other modification before MergeContent. To "defragment" (combine process 1 fragment with process 2 fragment), the FlowFiles produced by both process 1 and process 2 would need to have the following FlowFile attributes present and set correctly on each: Name Description fragment.identifier Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. fragment.index Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. fragment.count Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. segment.original.filename Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. Fragment.identifier, fragment.count, and segment.original.filename need to have same values on both FlowFiles. Fragment.index would be unique. The result would be one output FlowFile with the FlowFile content of both original process 1 and process 2 FlowFiles which process 3 could the use. Or if process 1 and 2 produce FlowFiles with just FlowFile Attributes you need and not content, you could set "Keep All Unique Attributes" as the attribute strategy so that the 1 merged FlowFile has all unique attributes form both source FlowFiles for process 3 to use. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:49 AM
@dubrovski Rather than using ExecuteStreamCommand processor to execute Curl, have you tried using the invokeHTTP processor instead for your PUT operation? If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:35 AM
@PepeClaro Your description is vague which makes it difficult to provide suggestions around incorporating them into a dataflow design. - What are these three "processes"? - How are those processes being executed? What processors in use for these 3 processes? - Are there any dependencies between these processes other then order of execution? For example, is output from processes 1 and/or 2 needed by process 3? - Do processes 1 and 2 need to be executed in parallel? - Is your NiFi a multi-node cluster? - What are the triggers for these processes? Does it require a NiFi FlowFile to trigger each processes? What kicks off this entire process dataflow? The more detail the better would be helpful. You may be able to set a fragment identifier, fragment count (2), and fragment index (1 or 2) for the first two process FlowFiles and then merge those fragments into one FlowFile that can trigger the third process. If either fragment is missing it will not merge and thus not trigger the 3 process. If not needing process 1 and 2 in parallel, then a single dataflows process1 --> process 2 --> process 3 where a failure anywhere along the dataflow prevents execution of next process. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:13 AM
@D5ha Sometimes it is useful to know more about your environment to include the full NiFi version and java versions. Since it is reporting issues as loading the flow: java.lang.Exception: Unable to load flow due to: java.util.zip.ZipException: invalid stored block lengths
at org.apache.nifi.web.server.JettyServer.start I would lean towards some issue/corruption of the flow.xml.gz and/or flow.json.gz on this node. Since all nodes run the same exact copy of these files, I'd copy them from a good node to the node failing to start. Depending on your NiFi version you may not have a flow.json.gz file (This format was introduced in the most recent versions). If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more