Member since
07-30-2019
3123
Posts
1563
Kudos Received
907
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
219 | 12-13-2024 10:58 AM | |
325 | 12-05-2024 06:38 AM | |
268 | 11-22-2024 05:50 AM | |
235 | 11-19-2024 10:30 AM | |
204 | 11-14-2024 01:03 PM |
12-24-2024
07:02 AM
1 Kudo
@BK84 I would avoid if possible designing dataflows that rely on making rest-api calls to control the processing of data. Any issue that may prevent the success of the rest-api call would have negative impacts on yoru data processing. Based on what you have shared, i'd suggest having your A->B->C dataflow directly connect to your D->E->F dataflow. Since your Python script (C) is responsible for creating the files in the directory from which ListFile (D) checks, why not have your python Script output a FlowFile that contains a list of filenames that it created. A splitContent processor could be used to split that into individual FlowFiles that can be passed directly to a FetchFile so that can be consumed (no need for listFile anymore) for writing to the ElasticSearch. Then lets consider the error handling and how to trigger next run without rest-api. SplitContent->FetchFile->PutElasticSearchJson should be placed in a child process group. That process group should be configured to use FlowFile Concurrency (Single FlowFile per Node) and an Outbound Policy (Batch Output). This means that only 1 FlowFile (FlowFile produced by Python Script that contains list of all files to be fetched) will be allowed to enter this child process group at a time. PutElasticSearchJson has relationships for handling retry, success, failure, and errors. These can be used to handle success as well as report (possibly using PutEmail processor) when processing has issues. On the Success relationship path you could use ModifyBytes to zero out the content and then MergeContent to merge the split FlowFiles back into one FlowFile using "Defragment" strategy. Add a max bin age to force failure of a bin after x amount of time if it does not have all its fragments. SplitContent creates all the required FlowFile attributes needed to support the defragment strategy. Assuming all Files were "success" from PutElasticSearchJson processor, the single defragmented File will be output which you send to an output port in that child process group. Once all FlowFiles in the child process are queued at output ports they will allowed to exit the child process group. Assuming a mergeContent "success". you can use this output FlowFile to trigger the next run (hopefully without using rest-api calls). Since you ddi not share how your data gets started in A and B, I can't really suggest anything here. Bottom line is that avoiding using rest-api calls to control your dataflows leads to faster more efficient processing of your data. Allow your dataflow to handle the error handling, But if you do choose to use rest-api calls, the best way to figure them out is to open developer tools in your browser and then manually perform the interactions needed via the NiFi UI. Through the developer tools you can capture (copy as CURL) the call being made in response to your UI action. Now you are likely using username and password to access your NiFi, but this adds more work to do so through NiFi dataflows (for one thing your username and password would be in plain text in the necessary dataflow to get your user token). So you will want to setup a Keystore and Truststore so authentication is handle via a mutualTLS handshake thus avoiding exposed passwords and need to get a user token. The InvokeHTTP processor can be used to make the rest-api calls and can be configured with and SSLContextService. This is a lot of high level information, but hope it has set you on the path to success. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-24-2024
06:00 AM
1 Kudo
@hegdemahendra As far as your issue goes, it would probably be useful to collect a series of thread dumps (at least spaced 5 minutes apart). Then you would be looking for any threads related to the stopping of components to see if they are progressing or hung. Is it stuck on stopping a specific processor or processor class? Do any of the processors that are being stopped have active threads showing for them? Thank you, Matt
... View more
12-23-2024
06:50 AM
1 Kudo
@BK84 I suggest starting a new community question for your new query. When you start you r new question, please provide more detail on your ask. Not clear what you mean by "trigger". What is the use case you are trying to implement? Thank you, Matt
... View more
12-23-2024
06:25 AM
1 Kudo
@Krish98 Not enough details here to determine what is going on. Do all your FlowFile that are being merged contain the required FlowFile attributes needed for this to be successful? Name Description fragment.identifier Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. fragment.index Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. fragment.count Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates how many FlowFiles should be expected in the given bundle. At least one FlowFile must have this attribute in the bundle. If multiple FlowFiles contain the "fragment.count" attribute in a given bundle, all must have the same value. segment.original.filename Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. Are the FlowFile attribute values correct? How are the individual FlowFile fragments being produced? I find it odd that you say 1 FlowFile is routed to "merged". This implies the a merge was success. Are you sure the "merged" FlowFile only contains the content of one FlowFile from the set of fragments? Any chance you routed both the "original" and "failure" relationships to the same connection? Can you share the full MergeContent processor configuration (all tabs)? When a FlowFile is routed to "failure", I would expect to see logging in the nifi-app.log related to reason for the failure. What do you see in the nifi-app.log? How many unique bundles are you trying to merge concurrently? I see you have set "Max Num Bins" to 200. So you expect to have 200 unique fragment.identifier bundles to merge at one time? How many FlowFiles typically make up one bundle? I also see you have "Max Bin Age" set to 300 sec (5 mins). Are all FlowFiles with same fragment.identifier making it to the MergeContent processor within the 5 minute of one another? Keep in mind that the MergeContent has the potential to consume a lot of NiFi heap memory depending on how it is configured. FlowFile Attributes/metadata is held in heap memory. FlowFiles that are allocated to MergeContent bins have there FlowFiles held in heap memory. So depending on how many FlowFiles make up a typical bundle, the number of bundles being concurrently handled (200 max), and the amount + size of the individual FlowFile attributes, binned FlowFils can consume a lot of heap memory. Do you encounter any out of memory errors in yoru nifi-app.log (might not be thrown by mergeContent). You must be mindful of this when designing a dataflow that uses MergeContent processors. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-20-2024
11:31 AM
1 Kudo
Thanks for the references @MattWho. Currently I've implemented a workaround with counters, and querying the nifi-api with http requests to get around this. It's definitely not a bullet proof implementation, with corner cases that need to be handled separately, but it's a start to build off of.
... View more
12-19-2024
12:21 PM
@mohdriyaz @Dahlgren Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
12-17-2024
04:48 PM
1 Kudo
@Shelton Thank you for your advice. As I use the latest version of NiFi and it requires Java 21, I added the following line in bootstrap.conf and confirmed the warning messages disappeared. java.arg.EnableNativeAccess=--enable-native-access=ALL-UNNAMED I appreciate your help. Thank you,
... View more
12-13-2024
08:41 AM
1 Kudo
@Zifo1 When using Site-to-SIte via Remote Process Groups (RPG) and Remote Input or Output ports between NiFi clusters, it is most efficient to push rather then pull data (FlowFiles). The NiFi RPG always acts as the client side of the connection. It will either send FlowFiles to a Remote Input Port or fetch FlowFiles from a Remote Output port. I would avoid fetching from Remote Output ports. You get better FlowFiles distribution across teh destination cluster when you send FlowFiles from the RPG. If the FlowFiles traverse both directions, you would simply setup a RPG on both NiFi clusters to push FlowFiles to the Remote Input Ports on opposite clusters. Details about Site-To-Site can be found here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site As far as the RPG goes, I recommend using the "RAW" transport protocol over HTTP. RAW requires that the dedicated RAW port is configured in the server side NiFi's nifi.properties file. RAW establishes a raw socket connection on the dedicated configured port. HTTP utilizes the same HTTPS port that all other NiFi interactions use. You'll need to make sure the network connectivity exists between both your NiFi Clusters on both the HTTP(s) and RAW ports. HTTP is always used to fetch Site-to-Site Details. Setting up the client side (Remote Process Group) Documentation is here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#configure-site-to-site-client-nifi-instance Setting up the sever side (NiFi with Remote Input or Remote Output ports) documentation can be found here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#configure-site-to-site-server-nifi-instance Even with Site-To-Site the communications between the two NiFi clusters requires both authentication and authorization. Authentication is established via a mutual TLS handshake initiated by the RPG. For Site-to-Site, the keystore and truststore setup en each NiFi's nifi.properties file are used in the MutualTLS exchange. NOTE: The NiFi out-of-box auto generated keystores and truststores are not suitable for negotiating a successful Mutual TLS handshake. There are numerous authorization policies that must be setup on the server side (remote ports NiFi) so that the client side (NiFi with RPG) is able to successfully send FlowFiles over Site-to-Site: 1. Retrieve Site-to-Site Details - This policy authorizes the client NiFi nodes (so all nodes in the client side NiFi cluster must be authorized) to retrieve site-to-site details from the server side NiFi. This includes details like number of nodes, load on those nodes, authorized remote ports, site-to-site raw port, https port, etc. 2. Receive data via Site-To-Site - This policy is setup on Remote Input ports to authorize the client side NiFi nodes to send FlowFiles to this specific port. 3. Send data via Site-to-Site - This policy is setup on the Remote Output Ports and allows authorized client nodes to fetch FlowFiles from the Remote output port. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-10-2024
06:09 AM
@sha257 Did you restart your NiFi after modifying the logback.xml? NiFi executes the ldap-user-group-provider during startup, so you should see DEBUG output in the nifi-app.log at that time and each time the sync interval happens. As far as ldap sample, I would need to see a ldap search group output that contains "member" entries for users you expect to see. Thanks, Matt
... View more
12-06-2024
12:25 AM
1 Kudo
@SAMSAL Hi, sorry that I was not clear enough in my question, was doing my best 😞 @MattWho This works, thank you very much. I have set a maximum back-off time for 1 minute as this process is quite time-sensitive and set the queue before RouteonAttribute as well as looped relationship to prioritize the oldest fileflow. Thank you both for help!
... View more