Member since
07-30-2019
3123
Posts
1561
Kudos Received
907
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
215 | 12-13-2024 10:58 AM | |
319 | 12-05-2024 06:38 AM | |
266 | 11-22-2024 05:50 AM | |
234 | 11-19-2024 10:30 AM | |
203 | 11-14-2024 01:03 PM |
12-24-2024
07:02 AM
@BK84 I would avoid if possible designing dataflows that rely on making rest-api calls to control the processing of data. Any issue that may prevent the success of the rest-api call would have negative impacts on yoru data processing. Based on what you have shared, i'd suggest having your A->B->C dataflow directly connect to your D->E->F dataflow. Since your Python script (C) is responsible for creating the files in the directory from which ListFile (D) checks, why not have your python Script output a FlowFile that contains a list of filenames that it created. A splitContent processor could be used to split that into individual FlowFiles that can be passed directly to a FetchFile so that can be consumed (no need for listFile anymore) for writing to the ElasticSearch. Then lets consider the error handling and how to trigger next run without rest-api. SplitContent->FetchFile->PutElasticSearchJson should be placed in a child process group. That process group should be configured to use FlowFile Concurrency (Single FlowFile per Node) and an Outbound Policy (Batch Output). This means that only 1 FlowFile (FlowFile produced by Python Script that contains list of all files to be fetched) will be allowed to enter this child process group at a time. PutElasticSearchJson has relationships for handling retry, success, failure, and errors. These can be used to handle success as well as report (possibly using PutEmail processor) when processing has issues. On the Success relationship path you could use ModifyBytes to zero out the content and then MergeContent to merge the split FlowFiles back into one FlowFile using "Defragment" strategy. Add a max bin age to force failure of a bin after x amount of time if it does not have all its fragments. SplitContent creates all the required FlowFile attributes needed to support the defragment strategy. Assuming all Files were "success" from PutElasticSearchJson processor, the single defragmented File will be output which you send to an output port in that child process group. Once all FlowFiles in the child process are queued at output ports they will allowed to exit the child process group. Assuming a mergeContent "success". you can use this output FlowFile to trigger the next run (hopefully without using rest-api calls). Since you ddi not share how your data gets started in A and B, I can't really suggest anything here. Bottom line is that avoiding using rest-api calls to control your dataflows leads to faster more efficient processing of your data. Allow your dataflow to handle the error handling, But if you do choose to use rest-api calls, the best way to figure them out is to open developer tools in your browser and then manually perform the interactions needed via the NiFi UI. Through the developer tools you can capture (copy as CURL) the call being made in response to your UI action. Now you are likely using username and password to access your NiFi, but this adds more work to do so through NiFi dataflows (for one thing your username and password would be in plain text in the necessary dataflow to get your user token). So you will want to setup a Keystore and Truststore so authentication is handle via a mutualTLS handshake thus avoiding exposed passwords and need to get a user token. The InvokeHTTP processor can be used to make the rest-api calls and can be configured with and SSLContextService. This is a lot of high level information, but hope it has set you on the path to success. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-24-2024
06:00 AM
@hegdemahendra As far as your issue goes, it would probably be useful to collect a series of thread dumps (at least spaced 5 minutes apart). Then you would be looking for any threads related to the stopping of components to see if they are progressing or hung. Is it stuck on stopping a specific processor or processor class? Do any of the processors that are being stopped have active threads showing for them? Thank you, Matt
... View more
12-23-2024
07:35 AM
1 Kudo
@hegdemahendra While I don't have an environment right now matching yours to test with, I did want to point out the NiFi Variables are deprecated and removed in NiFi 2.0. The NiFi 1.x branch will cease to be developed further. You need to move away from using NiFi Variables and start using NiFi Parameters instead. Parameters are more useful then Variables since they can be used in any property versus variables only being useable in properties supporting NiFi Expression Language (NEL). Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-23-2024
06:50 AM
1 Kudo
@BK84 I suggest starting a new community question for your new query. When you start you r new question, please provide more detail on your ask. Not clear what you mean by "trigger". What is the use case you are trying to implement? Thank you, Matt
... View more
12-23-2024
06:25 AM
1 Kudo
@Krish98 Not enough details here to determine what is going on. Do all your FlowFile that are being merged contain the required FlowFile attributes needed for this to be successful? Name Description fragment.identifier Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. fragment.index Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. fragment.count Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates how many FlowFiles should be expected in the given bundle. At least one FlowFile must have this attribute in the bundle. If multiple FlowFiles contain the "fragment.count" attribute in a given bundle, all must have the same value. segment.original.filename Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. Are the FlowFile attribute values correct? How are the individual FlowFile fragments being produced? I find it odd that you say 1 FlowFile is routed to "merged". This implies the a merge was success. Are you sure the "merged" FlowFile only contains the content of one FlowFile from the set of fragments? Any chance you routed both the "original" and "failure" relationships to the same connection? Can you share the full MergeContent processor configuration (all tabs)? When a FlowFile is routed to "failure", I would expect to see logging in the nifi-app.log related to reason for the failure. What do you see in the nifi-app.log? How many unique bundles are you trying to merge concurrently? I see you have set "Max Num Bins" to 200. So you expect to have 200 unique fragment.identifier bundles to merge at one time? How many FlowFiles typically make up one bundle? I also see you have "Max Bin Age" set to 300 sec (5 mins). Are all FlowFiles with same fragment.identifier making it to the MergeContent processor within the 5 minute of one another? Keep in mind that the MergeContent has the potential to consume a lot of NiFi heap memory depending on how it is configured. FlowFile Attributes/metadata is held in heap memory. FlowFiles that are allocated to MergeContent bins have there FlowFiles held in heap memory. So depending on how many FlowFiles make up a typical bundle, the number of bundles being concurrently handled (200 max), and the amount + size of the individual FlowFile attributes, binned FlowFils can consume a lot of heap memory. Do you encounter any out of memory errors in yoru nifi-app.log (might not be thrown by mergeContent). You must be mindful of this when designing a dataflow that uses MergeContent processors. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-16-2024
05:08 AM
1 Kudo
@tono425 It is just a WARNING that can safely be ignored. I found some threads on this here: https://bugs.openjdk.org/browse/JDK-8310626 https://apachenifi.slack.com/archives/C0L9VCD47/p1730736181056549?thread_ts=1730294478.475309&cid=C0L9VCD47 https://www.reddit.com/r/java/comments/17cjajl/why_introduce_a_mandatory_enablenativeaccess/ I suppose you could try adding a new java.arg.<some string/num> to the NiFi bootstrap.conf to "--enable-native-access=ALL-UNNAMED" to see if that stops these Warnings on NiFi startup. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-13-2024
10:58 AM
1 Kudo
@BK84 The best way to capture rest-api calls is by using the developer tools available in your browser. You can initiate the action with NiFi and capture the rest-api call that was made. I was able to fetch a token from my ldap-provider login in NiFi 2.0.0-M4 using below: curl 'https://<nifi-node-hostname>:<nifi port>/nifi-api/access/token' -H 'content-type: application/x-www-form-urlencoded;charset=UTF-8' --data-raw 'username=<username>&password=<password>' --insecure Which is no different then my NiFi 1.x version. How have you been fetching your token? Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-13-2024
08:48 AM
1 Kudo
@tono425 Apache NiFi 2.0.0 has only been tested and verified against Java 21. I suggest switching to latest Java 21 JDK update and seeing if same messages persist in your Linux /var/log/messages log. NiFi 2.0.0 System requirements Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-13-2024
08:41 AM
1 Kudo
@Zifo1 When using Site-to-SIte via Remote Process Groups (RPG) and Remote Input or Output ports between NiFi clusters, it is most efficient to push rather then pull data (FlowFiles). The NiFi RPG always acts as the client side of the connection. It will either send FlowFiles to a Remote Input Port or fetch FlowFiles from a Remote Output port. I would avoid fetching from Remote Output ports. You get better FlowFiles distribution across teh destination cluster when you send FlowFiles from the RPG. If the FlowFiles traverse both directions, you would simply setup a RPG on both NiFi clusters to push FlowFiles to the Remote Input Ports on opposite clusters. Details about Site-To-Site can be found here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site As far as the RPG goes, I recommend using the "RAW" transport protocol over HTTP. RAW requires that the dedicated RAW port is configured in the server side NiFi's nifi.properties file. RAW establishes a raw socket connection on the dedicated configured port. HTTP utilizes the same HTTPS port that all other NiFi interactions use. You'll need to make sure the network connectivity exists between both your NiFi Clusters on both the HTTP(s) and RAW ports. HTTP is always used to fetch Site-to-Site Details. Setting up the client side (Remote Process Group) Documentation is here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#configure-site-to-site-client-nifi-instance Setting up the sever side (NiFi with Remote Input or Remote Output ports) documentation can be found here: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#configure-site-to-site-server-nifi-instance Even with Site-To-Site the communications between the two NiFi clusters requires both authentication and authorization. Authentication is established via a mutual TLS handshake initiated by the RPG. For Site-to-Site, the keystore and truststore setup en each NiFi's nifi.properties file are used in the MutualTLS exchange. NOTE: The NiFi out-of-box auto generated keystores and truststores are not suitable for negotiating a successful Mutual TLS handshake. There are numerous authorization policies that must be setup on the server side (remote ports NiFi) so that the client side (NiFi with RPG) is able to successfully send FlowFiles over Site-to-Site: 1. Retrieve Site-to-Site Details - This policy authorizes the client NiFi nodes (so all nodes in the client side NiFi cluster must be authorized) to retrieve site-to-site details from the server side NiFi. This includes details like number of nodes, load on those nodes, authorized remote ports, site-to-site raw port, https port, etc. 2. Receive data via Site-To-Site - This policy is setup on Remote Input ports to authorize the client side NiFi nodes to send FlowFiles to this specific port. 3. Send data via Site-to-Site - This policy is setup on the Remote Output Ports and allows authorized client nodes to fetch FlowFiles from the Remote output port. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-10-2024
11:16 AM
@Dahlgren - Make sure you are using a Java 21 JDK and not a Java 21 JRE. - Try excluding the NiFi directory to include all the repositories from your Virus scanning software and see if that helps resolve the startup issue. Here is an Apache NiFi slack thread discussing this same issue: https://apachenifi.slack.com/archives/C0L9UPWJZ/p1731963549046199 Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more