Member since
07-30-2019
3472
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 251 | 06-03-2026 06:06 PM | |
| 529 | 05-06-2026 09:16 AM | |
| 1035 | 05-04-2026 05:20 AM | |
| 585 | 05-01-2026 10:15 AM | |
| 701 | 03-23-2026 05:44 AM |
07-30-2024
01:03 PM
1 Kudo
@kk-nifi Replay is not the proper way to handle failures. Failures should be handled in real-time through dataflow design. The "replay" option is only possible if NiFI still holds the content of the FlowFile you want to replay in its content repository. The replay ability is really built with the intention to be used in dataflow development testing ( Replaying a FlowFile ) Replay also required numerous manual steps making it difficult to automate retry. - First you need to execute a Provenance query. - From list of provenance events select the event(s) you want to replay one by one. - If content is still available you will have option to "replay" that FlowFile. There is also another option to "Replay last event", but again only works if last FlowFile's content still exists in the NiFi node's content repository. In your case, you talk about multiple failed FlowFiles for which this will not work to replay them all. NiFi stores the content in content claims with the NiFi content_repository. A content claim can hold the content for 1 too many FlowFiles. Once all FlowFiles referencing a content claim have reached point of auto-terminate, the claimant count would be zero. At that point the content claim will either be moved to archive or deleted depending on archive configuration. Even if archived, it is only retained for a limited amount of time. Also keep in mind that replay is NOT taking the original FlowFile and replaying it. Replay generates a new FlowFile with all the same FlowFile attributes and same content as the original FlowFile. Dataflow programatic handling is better. On failure configure auto retry as i described or route failure to some other processor(s) (optionally for tagging, updating, etc ) and then route back to failed processor. or better yet configured X number of auto-retry that only routes to "failure" relationships if all retry events end up failing. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-30-2024
10:57 AM
@Tiger_Name The issue you described matches up very well with https://issues.apache.org/jira/browse/NIFI-10018 which caused by https://issues.apache.org/jira/browse/NIFI-9988. While these jiras point at an issue in Apache NiFi 1.16.1, you are using the first milestone (M1) releases of Apache NiFi 2.0.0. So it quite possible that this issue may have been ported into the early milestone release. I would recommend downloading the latest milestone release of Apache NiFi 2.0.0-Mx, which at time of writing this is M4, to see if startup issue still presents itself. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-29-2024
01:09 PM
@rich197 Providing the exact steps you perform to reproduce might help here. I am not clear in this step: "Next I attempted to upload a flow into a process group". Are you trying to upload a template or a flow definition? A flow definition is a Process group containing one or more components. A template can be just a collection of components (Templates are deprecated and removed as of Apache NiFi 2.x) Thank you, Matt
... View more
07-29-2024
09:13 AM
1 Kudo
@varungupta This is a ~3 year old post with an already accepted answer. You are likely to get more responsive answers if you were to start a new thread. NiFi would have also evolved considerable over the past 3 years. Yes, tracking entities does not rely on timestamps to ensure listing of new FlowFiles and will help you here. NiFi grabbing 1 -2 of 20 is more then just timestamps, I suspect that how the files are being moved into the consumption directory is also impacting you. Tracking Timestamps is easiest and least resource consumption default setup, but does not work for all use cases. Timestamp is based on the last modified timestamp. When listing is performed it lists all Files with last processor state stored timestamp up to most recent file's last modified timestamp. Problem can happen if last modified timestamp is not updated. For example some system writes to directory A on your local machine and after write completes, it moves file to Directory B. With that atomic move the file timestamp is not updated. If the move does not happen fast enough it may get missed in the current listing. it is also possible that a moved file has an older last modified timestamp that another smeller files moved quicker to dir B. Thus resulting a timestamp stored in state that would be newer and thus resulting in that other file being ignored. Tracking entities was added to solution to these types of problems. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-29-2024
07:47 AM
@kk-nifi Is there more to your use case and dataflow design you can share? Where in your dataflow is the failure happening? With a successful operation a FlowFile is routed in majority of the processors to a "success" relationship. Any failure results in a FlowFile being routed to a failure or retry relationship with most processors. While in older versions of NiFi users would typically build a retry dataflow from the failure and/or retry relationships. With the latest versions of NiFi "retry" has been systematically built directly in to the Processor. The existing capability still exists but this give you even more ability in how you want to handle retry. Example: When you select retry on a relationship like "failure" or "retry" (never select retry on success), you are given the option to specify a retry attempts, a back off policy, and max back off period. Rather then a FlowFile being routed to the "failure" relationship when "retry" is selected, the FlowFile remains on the incoming queue to be tried again (in above example, 10 attempts will be made before the FlowFile is finally routed to the "retry" relationship.). The Retry Back Off Policy controls how you want to handle these retries: - "Penalize" applies a penalty to the FlowFile on the inbound connection. NiFi ignores penalized FlowFiles and continues to execute on other non penalized FlowFiles until the penalty expires. - "Yield" triggers the processor to yield for a duration of time and then retry the current FlowFile. This method ensure order of processing as no other FlowFile will be processed until this one is either successfully retried or all retry attempts have been exhausted and FlowFile has finally been routed to the "failure" relationship. The Retry Max Back Off Period controls the maximum time the FlowFile will either be penalized or max time processor will yield between retry attempts. Penalty and yield initial time is controlled by the "Yield Duration" and "Penalty Duration" configured in the processor's "Settings" tab. The duration is repeatedly doubled with each retry until the max backoff period is reached. -------------- Now when it comes to NiFi in a "distributed approach", I assume you mean when you have setup a multi-node NiFi cluster? In a multi-node NiFi cluster, each node loads its own copy of the dataflow and processes FlowFiles that are on that same node. Node 1 is completely unaware of the specific related to any FlowFile that exists on Nodes 2, 3, 4, etc.. So you need to account for this NiFi architecture in your dataflow designs. If there is a required order of execute for some batch of FlowFiles, you'll want to keep that batch on the same NiFi node and make sure you are configuring proper "Prioritizers" on all the connections between processor components. There is very little detail in your failure handling. Why are you cloning? What is the purpose of the added UpdateAttribute processor? Hopefully the newer "retry" options available on all relationships will help you with your use case. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-29-2024
07:13 AM
@Fredi I am not completely clear on your question here. Unless you have a dataflow built and running that has implemented some counter, then nothing is going to show in the NiFi Counters UI. But once you have components running like UpdateCounter, the counter UI would get populated: NiFi counters do not persist through a NiFi restart. After a restart the Counters UI will be blank again until a counter is generated by component that writes to a counter. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
01:25 PM
1 Kudo
@yagoaparecidoti If you are using ldap-provider for authentication, you should preferably be using your ldap users and groups for authorization using the ldap-user-group-provider rather then needing to manage those user identities and group identities manually in NiFi via the file-user-user-group-provider. When new users are added or removed in ldap, new groups created or removed in ldap, or new group membership are added in LDAP, this all automatically resync in NiFi. And if you do all your Authorization through ldap groups, this management becomes automatic with little effort on your side except when needing to setup an all new group authorization. Fetching a token for your ldap user who is authorized to view and modify user and groups: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/access/token' \
-H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' \
--data-raw 'username=<username>&password=<password>' \
--insecure This will return a Bearer Token that is only valid for the expiration period configured in your ldap-provider (default: 12 hours). Replace $TOKEN with this response token string you got from above command in the rest of the examples. Example command to add a new group: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups' \
-H 'Authorization: Bearer $TOKEN' \
-H 'Content-Type: application/json' \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":0},"disconnectedNodeAcknowledged":false,"component":{"identity":"newgroup2","users":[]}}' \
--insecure In the response you will get the UUID assigned to this new group. Example: f07dcb3f-0190-1000-0000-00003f3139fe Example command to add new user: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/users' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer $TOKEN \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":0},"disconnectedNodeAcknowledged":false,"component":{"identity":"newuser3"}}' \
--insecure In the response you will get the UUID of the new user. Example: f089d520-0190-1000-ffff-ffffd5c79edc Before you can add a user to the group, you need to get a list all user currently part of group. curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups' \
-H 'Authorization: Bearer $TOKEN' \
--insecure You can parse the response json by group name or the group uuid. With the json for the group you will find the current users and their assigned uuids You'll need all those current user uuids and the uuid of the user(s) you want to add to the group. curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups/f07dcb3f-0190-1000-0000-00003f3139fe' \
-X 'PUT' \
-H 'Authorization: Bearer $TOKEN' \
-H 'Content-Type: application/json' \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":4},"disconnectedNodeAcknowledged":false,"component":{"id":"f07dcb3f-0190-1000-0000-00003f3139fe","identity":"newgroup2","configurable":true,"users":[{"id":"f083ef9d-0190-1000-ffff-ffffa9456011"},{"id":"f089d520-0190-1000-ffff-ffffd5c79edc"},{"id":"350a48fc-018f-1000-0000-000018f120ca"}],"accessPolicies":[]}}' \
--insecure You should be able to add users at same time as you create a new group. But that is not very common. It is more common to add new users and associate them with already existing groups, hence the example provided above. You can see in the new group example i have that the payload will accept user ids. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
09:37 AM
@yagoaparecidoti If you find you are adding that many users that the process is cumbersome, perhaps you should setup your NiFi to sync your users and groups from ldap/ad instead. If you authorize the ldap groups, and user belonging or added to the ldap group would automatically then get the group authorizations in NiFi. Reference: LdapUserGroupProvider If still want to manage your users and groups manually in NiFi, here are my suggestions: The easiest way is to create a service user certificate (clientAuth PrivateKey) that is trusted by your NiFi and then authorized to view and modify on Access users/user groups. An TLS client certificate can be configured with as long of an expiration as you want. Since the authentication is handled in the mutual TLS exchange there are not token (extra steps) need using this method. MutualTLS authentication is always enabled with secure NiFi even if other methods are also enabled (MutualTLS is always checked first as secured NiFI will always WANT a client certificate in the handshake. NiFi move on to next authentication method only if no client certificate is provided). If you another method of authenticating your user who have this access like the "ldap-provider" you would need to fetch an authentication token all the time and then inlcude that authentication token in the rest-api request for each addition and modification to user and groups. Tokens expire. The NiFI rest-api docs can help you with the rest-api endpoints. I find it easier to open developer tools in my browser. Perform the action as I would normally do via the NiFi UI and right click on the request in the developer tools and select copy as curl. I can then see the exact rest-api call that was bing made along with and data that needs to go with that request. There will be a bunch of unnecessary headers you will not need to include. If you choose to go the route of using an authorized ldap user for your automation (not recommended), you'll also need to capture the rest-api call in dev tools for login to capture how to get and store the necessary token. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
08:52 AM
1 Kudo
@NagendraKumar This is an expensive resource wise use case for NiFi. Your goal here seems simple, but really isn't because how NiFi is designed to be used to process lost of data in a concurrent fashion. The PutFile processor does not support an append option just like similar other processor don't. Reason being concurrency. Let's consider the typical deployment of NiFi is a muli-node cluster. Each node loads its own copy of the dataflows and executes against only the FlowFiles queued in that specific node with not awareness of what queued data may exists on other nodes. Now lets look at your use case and how typical data consumption would happen in a multi-node cluster. The data may be available to all nodes locally as a mounted disk or only available on one node (not sure of your setup here). - You want to consume a file (that may or may not have been consumed earlier in the day with same filename?) and append any new data for same filename to an existing file if it already exists in the target directory? - OR your source directory does not have a consistent file name each day and you just want to consume any file from source directory regardless of filename and append to a file with the current day as its filename? The strategy is a little different for each of these use cases. You would typically have a ListFile processor (configured to execute on primary node only) that list new files in the source directory (generates a zero byte FlowFile with various metadata/attributes about that file). This would then feed into a FetchFile processor that retrieves the content for that File and adds it to the listed FlowFile. This is a common setup for a multi-node cluster where source is reachable form all nodes. This setup allows you distribute those zero byte FlowFiles listed by only the primary across all your nodes so each node can fetch content for unique FlowFile (spread resource usage across all nodes). Even if you are using a single instance of NiFi, it is better to design flows with a multi-node cluster in mind should you ever need to scale out to a cluster later. The challenge here you really can't have two nodes or even set higher concurrency locally on putFile because two threads could not be appending to same file at same time. This is why append is not an option. Now as far as designing a dataflow that that would work on a single NiFi instance, this might be possible through some creative design. My design is much like the one provided by @SAMSAL.. I just try to take into account some controls over concurrency to avoid multiple concurrent transactions possible resulting in some lost data and design a dataflow that handles when things go as planned and when they do not. You start with : 1. ListFile processor configured to consume from source directory. 2. Add new process group. Configure this process group with "Single FlowFile Per Node" in the "Process Group FlowFile Concurrency" property. 3. Enter the process group where you will have a blank canvas. Add an input port. Add an Update Attribute processor. Connect the input port to this Update Attribute processor. 4. In this Update Attribute processor we are going to create a custom property with name "fragment.identifier" with a value of "${UUID()}" (this create a unique ID for the fragment identifier). Add a second dynamic property with name "fragment.count" and value of "2". 5. We will now add two more Update Attribute processors. We will drag a connection from the First UpdateAttribute processor to twice (once to each of these newly added Update Attribute processors. 6. Go back to parent process group and connect you ListFile to the process group input port. Flow should look like this at this point in time ( I numbered UpdateAttribute processors to avoid confusion moving forward): and inside the child process group you should have: Navigate back into the child processor group to continue building this dataflow. Since NiFi does not support append into an existing target file, the goal here is to fetch both the new content from source directory (UpdateAttribute 2) and and the existing file from target directory (UpdateAttribute 3). 7. Configure UpdateAttribute2 with one new custom property with name "fragment.index" and value "2" since we want new content added after original content. 8. Configure UpdateAttribute 3 with three new dynamic properties. One with name "absolute.path" and value set to absolute path of target . Set other dynamic property with name "fragment.index" and value of "1" since we want this content before new content. Create a third dynamic property with name "filename" with a value of "${now():format('ddMMyyyy')}.txt". 9. Add a FetchFile processor and connect success from UpdateAttribute2 to it. Don't change the default configured properties (I named mine "FetchFile - fetch new data" 10 Add another FetchFile processor and connect success from UpdateAttribute3 to it. In only this FetchFile edit the "File to Fetch" property with value "${absolute.path}/${target.filename}" so that this processor fetch content for existing daily fie from target directory. (I named this FetchFile "FetchFile - fetch existing data") 11. Add a funnel. Connect "success" from "File to Fetch" to it. Connect both "success" and "not.found from "FetchFile - fetch existing data" to same funnel. (not.found needs to be route to funnel to handle use case where new ingested file is first for the day so target directory does not yet have that days file). 12. Add a Merge Content processor (configured to use "Merge Strategy" set to "Defragment" and "Attribute Strategy" set to "Keep All Unique Attributes") 13. Add another UpdateAttribute processor. Add a dynamic property with name "filename" and value set to "target.filename". This is necessary to make sure we maintain writing out same file date we have been working with since ingestion. Connect the "merged" relationship from MergeContent to this UpdateAttribute. If you were to dynamically set the target filename in putFile, you run the risk that a file may be ingested on day 27 but crest in to day 28 before the putfile. 14. Add your PutFile processor and connect Success From above updateAttribute to it. Configure your PutFile with the target directory path and replace strategy to overwrite exiting file unless you had FetchFile delete it earlier in yoru flow. The Entire flow inside the child process group should look something like this: NOTE: You'll see in above flow some failure, permission denied, and a single not.found relationships you need to deal with in unexpected conditions that may result in FlowFile routing to one of these. Would not expect under normal execution to have any FlowFiles route to these. The concurrency rules on the child process group will make sure this child process group flow completes before allowing another FlowFile to enter for processing. So you can see how complicated this use case is for NiFi. I do not know how often your ListFile will be polling for new source files. I do not know how large you expect your target file to grow. So if you are trying to use NiFi like a logger that is constantly appending to the file you can see how expensive this flow would get CPU and disk I/O as it needs to constantly ingest the latest target file to append to each time. if your source file is some file that is being appended to constantly through out the day, maybe configure your NiFi ListFile to run only once an hour. Then you limit your source and target files fetched to only 24 times per day. As the day goes on and thee files get larger, there will be more disk I/O impact. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
06:22 AM
1 Kudo
@NagendraKumar This is not something I have messed with much. The GrokReader is what would be commonly used to parse unstructured data. Your data looks similar to Cisco syslog structure. While the GrokReader has built in pattern file, you may find yourself needing to define a custom pattern file for your specific data. You might find this other community post helpful here: https://community.cloudera.com/t5/Support-Questions/ExtractGrok-processor-Writing-Regex-to-parse-Cis... Hopefully you can use the pattern file example provided through the github post form that other community thread to help create a custom pattern file that works for your specific data: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns Hope you find this information helps you with your use case journey. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more