Member since
07-30-2019
3001
Posts
1484
Kudos Received
868
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
190 | 07-19-2024 12:57 PM | |
126 | 07-17-2024 10:20 AM | |
245 | 07-01-2024 09:49 AM | |
264 | 06-14-2024 07:36 AM | |
238 | 06-14-2024 07:26 AM |
07-26-2024
01:25 PM
@yagoaparecidoti If you are using ldap-provider for authentication, you should preferably be using your ldap users and groups for authorization using the ldap-user-group-provider rather then needing to manage those user identities and group identities manually in NiFi via the file-user-user-group-provider. When new users are added or removed in ldap, new groups created or removed in ldap, or new group membership are added in LDAP, this all automatically resync in NiFi. And if you do all your Authorization through ldap groups, this management becomes automatic with little effort on your side except when needing to setup an all new group authorization. Fetching a token for your ldap user who is authorized to view and modify user and groups: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/access/token' \
-H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' \
--data-raw 'username=<username>&password=<password>' \
--insecure This will return a Bearer Token that is only valid for the expiration period configured in your ldap-provider (default: 12 hours). Replace $TOKEN with this response token string you got from above command in the rest of the examples. Example command to add a new group: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups' \
-H 'Authorization: Bearer $TOKEN' \
-H 'Content-Type: application/json' \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":0},"disconnectedNodeAcknowledged":false,"component":{"identity":"newgroup2","users":[]}}' \
--insecure In the response you will get the UUID assigned to this new group. Example: f07dcb3f-0190-1000-0000-00003f3139fe Example command to add new user: curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/users' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer $TOKEN \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":0},"disconnectedNodeAcknowledged":false,"component":{"identity":"newuser3"}}' \
--insecure In the response you will get the UUID of the new user. Example: f089d520-0190-1000-ffff-ffffd5c79edc Before you can add a user to the group, you need to get a list all user currently part of group. curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups' \
-H 'Authorization: Bearer $TOKEN' \
--insecure You can parse the response json by group name or the group uuid. With the json for the group you will find the current users and their assigned uuids You'll need all those current user uuids and the uuid of the user(s) you want to add to the group. curl 'https://<nifi-hostname>:<nifi-port>/nifi-api/tenants/user-groups/f07dcb3f-0190-1000-0000-00003f3139fe' \
-X 'PUT' \
-H 'Authorization: Bearer $TOKEN' \
-H 'Content-Type: application/json' \
--data-raw '{"revision":{"clientId":"f06dc8a3-0190-1000-f61c-a511bdac0cf1","version":4},"disconnectedNodeAcknowledged":false,"component":{"id":"f07dcb3f-0190-1000-0000-00003f3139fe","identity":"newgroup2","configurable":true,"users":[{"id":"f083ef9d-0190-1000-ffff-ffffa9456011"},{"id":"f089d520-0190-1000-ffff-ffffd5c79edc"},{"id":"350a48fc-018f-1000-0000-000018f120ca"}],"accessPolicies":[]}}' \
--insecure You should be able to add users at same time as you create a new group. But that is not very common. It is more common to add new users and associate them with already existing groups, hence the example provided above. You can see in the new group example i have that the payload will accept user ids. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
09:37 AM
@yagoaparecidoti If you find you are adding that many users that the process is cumbersome, perhaps you should setup your NiFi to sync your users and groups from ldap/ad instead. If you authorize the ldap groups, and user belonging or added to the ldap group would automatically then get the group authorizations in NiFi. Reference: LdapUserGroupProvider If still want to manage your users and groups manually in NiFi, here are my suggestions: The easiest way is to create a service user certificate (clientAuth PrivateKey) that is trusted by your NiFi and then authorized to view and modify on Access users/user groups. An TLS client certificate can be configured with as long of an expiration as you want. Since the authentication is handled in the mutual TLS exchange there are not token (extra steps) need using this method. MutualTLS authentication is always enabled with secure NiFi even if other methods are also enabled (MutualTLS is always checked first as secured NiFI will always WANT a client certificate in the handshake. NiFi move on to next authentication method only if no client certificate is provided). If you another method of authenticating your user who have this access like the "ldap-provider" you would need to fetch an authentication token all the time and then inlcude that authentication token in the rest-api request for each addition and modification to user and groups. Tokens expire. The NiFI rest-api docs can help you with the rest-api endpoints. I find it easier to open developer tools in my browser. Perform the action as I would normally do via the NiFi UI and right click on the request in the developer tools and select copy as curl. I can then see the exact rest-api call that was bing made along with and data that needs to go with that request. There will be a bunch of unnecessary headers you will not need to include. If you choose to go the route of using an authorized ldap user for your automation (not recommended), you'll also need to capture the rest-api call in dev tools for login to capture how to get and store the necessary token. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
08:52 AM
@NagendraKumar This is an expensive resource wise use case for NiFi. Your goal here seems simple, but really isn't because how NiFi is designed to be used to process lost of data in a concurrent fashion. The PutFile processor does not support an append option just like similar other processor don't. Reason being concurrency. Let's consider the typical deployment of NiFi is a muli-node cluster. Each node loads its own copy of the dataflows and executes against only the FlowFiles queued in that specific node with not awareness of what queued data may exists on other nodes. Now lets look at your use case and how typical data consumption would happen in a multi-node cluster. The data may be available to all nodes locally as a mounted disk or only available on one node (not sure of your setup here). - You want to consume a file (that may or may not have been consumed earlier in the day with same filename?) and append any new data for same filename to an existing file if it already exists in the target directory? - OR your source directory does not have a consistent file name each day and you just want to consume any file from source directory regardless of filename and append to a file with the current day as its filename? The strategy is a little different for each of these use cases. You would typically have a ListFile processor (configured to execute on primary node only) that list new files in the source directory (generates a zero byte FlowFile with various metadata/attributes about that file). This would then feed into a FetchFile processor that retrieves the content for that File and adds it to the listed FlowFile. This is a common setup for a multi-node cluster where source is reachable form all nodes. This setup allows you distribute those zero byte FlowFiles listed by only the primary across all your nodes so each node can fetch content for unique FlowFile (spread resource usage across all nodes). Even if you are using a single instance of NiFi, it is better to design flows with a multi-node cluster in mind should you ever need to scale out to a cluster later. The challenge here you really can't have two nodes or even set higher concurrency locally on putFile because two threads could not be appending to same file at same time. This is why append is not an option. Now as far as designing a dataflow that that would work on a single NiFi instance, this might be possible through some creative design. My design is much like the one provided by @SAMSAL.. I just try to take into account some controls over concurrency to avoid multiple concurrent transactions possible resulting in some lost data and design a dataflow that handles when things go as planned and when they do not. You start with : 1. ListFile processor configured to consume from source directory. 2. Add new process group. Configure this process group with "Single FlowFile Per Node" in the "Process Group FlowFile Concurrency" property. 3. Enter the process group where you will have a blank canvas. Add an input port. Add an Update Attribute processor. Connect the input port to this Update Attribute processor. 4. In this Update Attribute processor we are going to create a custom property with name "fragment.identifier" with a value of "${UUID()}" (this create a unique ID for the fragment identifier). Add a second dynamic property with name "fragment.count" and value of "2". 5. We will now add two more Update Attribute processors. We will drag a connection from the First UpdateAttribute processor to twice (once to each of these newly added Update Attribute processors. 6. Go back to parent process group and connect you ListFile to the process group input port. Flow should look like this at this point in time ( I numbered UpdateAttribute processors to avoid confusion moving forward): and inside the child process group you should have: Navigate back into the child processor group to continue building this dataflow. Since NiFi does not support append into an existing target file, the goal here is to fetch both the new content from source directory (UpdateAttribute 2) and and the existing file from target directory (UpdateAttribute 3). 7. Configure UpdateAttribute2 with one new custom property with name "fragment.index" and value "2" since we want new content added after original content. 8. Configure UpdateAttribute 3 with three new dynamic properties. One with name "absolute.path" and value set to absolute path of target . Set other dynamic property with name "fragment.index" and value of "1" since we want this content before new content. Create a third dynamic property with name "filename" with a value of "${now():format('ddMMyyyy')}.txt". 9. Add a FetchFile processor and connect success from UpdateAttribute2 to it. Don't change the default configured properties (I named mine "FetchFile - fetch new data" 10 Add another FetchFile processor and connect success from UpdateAttribute3 to it. In only this FetchFile edit the "File to Fetch" property with value "${absolute.path}/${target.filename}" so that this processor fetch content for existing daily fie from target directory. (I named this FetchFile "FetchFile - fetch existing data") 11. Add a funnel. Connect "success" from "File to Fetch" to it. Connect both "success" and "not.found from "FetchFile - fetch existing data" to same funnel. (not.found needs to be route to funnel to handle use case where new ingested file is first for the day so target directory does not yet have that days file). 12. Add a Merge Content processor (configured to use "Merge Strategy" set to "Defragment" and "Attribute Strategy" set to "Keep All Unique Attributes") 13. Add another UpdateAttribute processor. Add a dynamic property with name "filename" and value set to "target.filename". This is necessary to make sure we maintain writing out same file date we have been working with since ingestion. Connect the "merged" relationship from MergeContent to this UpdateAttribute. If you were to dynamically set the target filename in putFile, you run the risk that a file may be ingested on day 27 but crest in to day 28 before the putfile. 14. Add your PutFile processor and connect Success From above updateAttribute to it. Configure your PutFile with the target directory path and replace strategy to overwrite exiting file unless you had FetchFile delete it earlier in yoru flow. The Entire flow inside the child process group should look something like this: NOTE: You'll see in above flow some failure, permission denied, and a single not.found relationships you need to deal with in unexpected conditions that may result in FlowFile routing to one of these. Would not expect under normal execution to have any FlowFiles route to these. The concurrency rules on the child process group will make sure this child process group flow completes before allowing another FlowFile to enter for processing. So you can see how complicated this use case is for NiFi. I do not know how often your ListFile will be polling for new source files. I do not know how large you expect your target file to grow. So if you are trying to use NiFi like a logger that is constantly appending to the file you can see how expensive this flow would get CPU and disk I/O as it needs to constantly ingest the latest target file to append to each time. if your source file is some file that is being appended to constantly through out the day, maybe configure your NiFi ListFile to run only once an hour. Then you limit your source and target files fetched to only 24 times per day. As the day goes on and thee files get larger, there will be more disk I/O impact. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
06:22 AM
@NagendraKumar This is not something I have messed with much. The GrokReader is what would be commonly used to parse unstructured data. Your data looks similar to Cisco syslog structure. While the GrokReader has built in pattern file, you may find yourself needing to define a custom pattern file for your specific data. You might find this other community post helpful here: https://community.cloudera.com/t5/Support-Questions/ExtractGrok-processor-Writing-Regex-to-parse-Cis... Hopefully you can use the pattern file example provided through the github post form that other community thread to help create a custom pattern file that works for your specific data: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns Hope you find this information helps you with your use case journey. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-26-2024
06:17 AM
1 Kudo
@cadrian90 I agree with @SAMSAL response. Typically the ConvertRecord processor is what would be used here. The processor support numerous record readers and numerous record writers. The GrokReader is what would be commonly used to parse unstructured data like your Cisco syslog messages. While the GrokReader has bulit in pattern file, you may fond yourself needing to define a custom pattern file for your specific data. You might find this other community post helpful here: https://community.cloudera.com/t5/Support-Questions/ExtractGrok-processor-Writing-Regex-to-parse-Cisco-syslog/td-p/233095 Beyond above, this is where it becomes challenging since Apache NiFi only has a CEFReader and no CEFRecordSetWriter (perhaps you can raise an Apache Jira asking for this new reader and someone in the Apache community may be able to help) There does exist a ScriptedRecordSetWriter that if you know how to scripted out the CEF format, maybe you can use that. I really would not be able to help there myself. Maybe you can look into the CSVRecordSetWriter to see if selecting a custom format would facilitate an output like CEF. Again not something I have tried myself. Hope this helps you with your use case journey. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-23-2024
06:25 AM
@mirkom As far as the post you referred to in your original question. It is not accurate. The GetSFTP does NOT accept an inbound connection. The only SFTP ingest processor that accepts and inbound connection is the FetchSFTP processor (which is the processor that other query was actually referring to). I also can't speak to the customized version of the listSFTP processor built in that other thread. Thanks, Matt
... View more
07-23-2024
06:18 AM
@mirkom NiFi is a flow based programming application. Processors configuration properties can get there values from parameter contexts which might be useful for you here. Some processors can get values by using NiFi Expression Language. NiFi is designed as as an "always on" with its dataflows using available scheduling strategies offered. Source processors (those with no inbound connections) need to have a valid configuration in order to start. Meaning the properties need at a minimum to execute must be available to the processor. So as a source processor, the only way to have those values is if they are set directly on the processor or pulled from a parameter context. In NiFi you can create a Process Group (PG) and then build a reusable dataflow within it (From you description it sounds like you have only a few different needed flow designs to meet your use cases). For you reusable dataflow, you should use the ListSFTP connected to a FetchSFTP to ingest data. ON a process group you can configure/define a "parameter context". A parameter context holds the unique configuration values for each of your source and dest host info. So you would have 500 different parameter contexts. So you can copy your PG many times and simply assign a different parameter context to each one making dataflow development a bit easier. So building out in this way makes expansion easier, but still requires some work. Also keep in mind that you have many use case where you are simply moving content from SFTP server A to SFTP server B. When you utilize NiFi for this use case, you are ingesting all that content into your NiFi cluster and they writing it back out another SFTP server. This adds some overhead in read and write operations versus a direct transfer between A and B without the local write that NiFi would do. What NiFi lest you do is manage all these dataflows through the NiFi UI. NIFi allows you to scale out by adding more nodes to the cluster as workload and volume increases easily without needing to modify your dataflows (assuming they are built well with data distribution implemented in the designs). Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-23-2024
05:44 AM
1 Kudo
@NagendraKumar You might want to try using the QueryRecord processor or ScriptedTransformRecord processor. Since you data is unstructured, you could try using the GrokReader and FreeFormTextRecordSetWriter. I agree that splitting and merging is not ideal with som many FlowFiles. ExtractText loads FlowFile content in to memory in order to parse it for extracting bits (High heap usage). MergeContent loads FlowFile metadata (FlowFile Attributes and metadata) in to heap memory for all FlowFiles allocated to merge bins (High Heap usage which can be managed via multiple MergeContent processor sin series limiting max bin FlowFile count). Hope this helps give you some alternate direction. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-22-2024
10:23 AM
@NagendraKumar ExtractText is only going to work with a well defined content structure. So when you have an unknown number of records in a single FlowFile, you would be better to split that multi-record file into single record files in which you can apply your ExtractText and ReplaceText dataflow against. You can then easily merge those split records back into the one file using a MergeContent with Defragment option. Since your files have an unknown number of records separated by a blank line, the SplitContent processor can easily used to split source FlowFile into individual record FlowFiles. The "Byte Sequence" is simply two line returns. After your ExtractText and ReplaceText processors, you can recombine all the splits to one FlowFile using MergeContent as below: Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-19-2024
12:57 PM
@NagendraKumar Often times there is more then 1 may to solution a use case. Here is one possible solution: NiFi Components used: SplitRecord Used to split your multi-row CSV record in to individual records. This processor will use a CSVReader: and CSVRecordSetWriter: The "Splits" relationship then gets routed to a ReplaceText processor (used to reformat the individual line record): "Search Value" based on four items per line (header and body): ^(.*?),(.*?),(.*?),(.*?)[\r\n]+(.*?),(.*?),(.*?),(.*?)[\r\n]+ "Replacement Value": The "Success" relationship is then routed to a MergeContent processor (used to recombine the original multi-records into a single FlowFile): Note: Demarcator is configured with line return to provide a new line between records in content. The assemble portion of this dataflow looks like this: Above is a working solution based on your shared example. It works no matter how many CSV rows exist in the source file. Other possibilities: I feel like this use case could also be accomplished using maybe the ScriptedTransformRecord processor. I am just not sure myself on how to write the scripted needed here correctly. Perhaps others in the community have suggestions. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more