Member since
07-30-2019
3406
Posts
1623
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 319 | 12-17-2025 05:55 AM | |
| 380 | 12-15-2025 01:29 PM | |
| 366 | 12-15-2025 06:50 AM | |
| 358 | 12-05-2025 08:25 AM | |
| 599 | 12-03-2025 10:21 AM |
10-25-2022
07:29 AM
@PepeClaro While NiFi supports parallel thread execution, there is no way to guarantee that two threads execute at the exact same time. So one NiFi component processor is unaware of what another NiFi Component processor is doing or when it is executing. Processors that have an inbound connection on them use an inbound connection queued FlowFile as the trigger to perform start execution. Step 1 is to identify what NiFi component processors can be used to perform/execute your 3 processes: https://nifi.apache.org/docs.html I have no idea form your description what your 3 processes do, so I can't make any recommendations on what you can/should use. Step 2 is deciding how to interconnect these NiFi processor component and preserve data needed for downstream dataflow processing in your third process. When a processor executes the response/return from the execution can result in modification to existing NiFi FlowFile's content, Creation of new FlowFile content, Creation of an entirely new FlowFile, Creation of new FlowFile attributes (key/value pairs), modification of FlowFile attributes, or none of the above depending on the NiFi component processor being used. Since you mention that first 2 processes get info that is needed by process 3, so would need to take that into consideration for process 3. Where is that info go ing to end up (FlowFile Content or FlowFile attributes)? How large is that info returned (does it make sense to put it in to attribute)? Does that returned info need to be modified in any way before process 3? In your Flow as described, you have two Process Groups (PG), Each of these PGs performs your process 1 and process 2. Each will be executing independent of the other and thus can not guarantee execution at the exact same time. Cron scheduling of a processor can give a better chance of same time execution but still not a guarantee since it only schedules when to request an available thread from the NiFi Max Timer Driven Thread pool. If at time of request all threads are in use, it will execute as soon as thread becomes available. Now out of these two PGs you will have two FlowFiles that your third process depends on. There is no way to tell a NiFi processor component to pull attributes or content from two different FlowFiles source FlowFiles. So before process 3 you need to combine any needed attributes and or content from the two original FlowFiles into one FlowFile that Process 3 can use. Hard to make a recommendation here since I don't know any details about your 3 processes, what the FlowFiles that are produced by Process 1 and 2 contain in terms of content and attributes, and what content and/or attributes from process 1 and 2 are needed by process 3. I made suggestion about maybe being able to use the "defragment" merge strategy from the MergeContent processor to combine the FlowFiles from process 1 and process 2, but not enough detail to say or say without needing to do other modification before MergeContent. To "defragment" (combine process 1 fragment with process 2 fragment), the FlowFiles produced by both process 1 and process 2 would need to have the following FlowFile attributes present and set correctly on each: Name Description fragment.identifier Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. fragment.index Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. fragment.count Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. segment.original.filename Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. Fragment.identifier, fragment.count, and segment.original.filename need to have same values on both FlowFiles. Fragment.index would be unique. The result would be one output FlowFile with the FlowFile content of both original process 1 and process 2 FlowFiles which process 3 could the use. Or if process 1 and 2 produce FlowFiles with just FlowFile Attributes you need and not content, you could set "Keep All Unique Attributes" as the attribute strategy so that the 1 merged FlowFile has all unique attributes form both source FlowFiles for process 3 to use. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:49 AM
@dubrovski Rather than using ExecuteStreamCommand processor to execute Curl, have you tried using the invokeHTTP processor instead for your PUT operation? If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:35 AM
@PepeClaro Your description is vague which makes it difficult to provide suggestions around incorporating them into a dataflow design. - What are these three "processes"? - How are those processes being executed? What processors in use for these 3 processes? - Are there any dependencies between these processes other then order of execution? For example, is output from processes 1 and/or 2 needed by process 3? - Do processes 1 and 2 need to be executed in parallel? - Is your NiFi a multi-node cluster? - What are the triggers for these processes? Does it require a NiFi FlowFile to trigger each processes? What kicks off this entire process dataflow? The more detail the better would be helpful. You may be able to set a fragment identifier, fragment count (2), and fragment index (1 or 2) for the first two process FlowFiles and then merge those fragments into one FlowFile that can trigger the third process. If either fragment is missing it will not merge and thus not trigger the 3 process. If not needing process 1 and 2 in parallel, then a single dataflows process1 --> process 2 --> process 3 where a failure anywhere along the dataflow prevents execution of next process. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:13 AM
@D5ha Sometimes it is useful to know more about your environment to include the full NiFi version and java versions. Since it is reporting issues as loading the flow: java.lang.Exception: Unable to load flow due to: java.util.zip.ZipException: invalid stored block lengths
at org.apache.nifi.web.server.JettyServer.start I would lean towards some issue/corruption of the flow.xml.gz and/or flow.json.gz on this node. Since all nodes run the same exact copy of these files, I'd copy them from a good node to the node failing to start. Depending on your NiFi version you may not have a flow.json.gz file (This format was introduced in the most recent versions). If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
08:59 AM
@MrBurns You want to take the URL that written to the FlowFile's attribute "http.request.uri" and generate a Json, correct? Where do you want to write that JSON (a new FlowFile attribute? Content of the FlowFile?)? There are multiple ways to handle this. If you just want to write JSON to a new FlowFile Attribute, you could use the "Advanced" UI of the UpdateAttribute by setting up a rule for each url type. If you want to write to the content of a FlowFile, you could follow the above UpdateAttribute with a replaceText processor that does an "always replace" to write the json from the attribute to the content of the FlowFile. another option here is to use a RouteOnAttribute to route each url type to a unique ReplaceText to handle the specific url type. I like first option since you can easily add new rules to the UpdateAttribute if any additional URL types are introduced without needing to modify the rest of your dataflow. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
07:55 AM
@PriyankaMondal I don't recommend using the NiFi Embedded Zookeeper (ZK). It makes things easy, but not an ideal solution for production. ZK requires a quorum of 3 nodes minimum. With NiFi configured to to use the embedded ZK, this would require your NiFi cluster to have at least 3 nodes. Without a quorum ZK cannot perform its required role. ZK is used to elected the NiFi cluster required cluster coordinator and primary node roles. Also when using embedded ZK, even with 3 NiFi nodes, the ZK won't achieve quorum until all three nodes are up and then you'll see messages like you shared until ZK cluster has formed and quorum established. Your cluster can also break (lose access to UI) if you lose nodes (NiFi shutdown or dies) because you also end up losing the embedded ZK and thus quorum is lost. I suggest going to each of your 3 NiFi servers Svxxx.xyz.com (1), Svxxx.xyz.com (2) and Svxxx.xyz.com (3) to make sure that ZK started and is listening on port 2181. I am assuming these are really three different hosts with unique hostnames and not that you tried to create 3 ZK on one host. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
07:35 AM
@D5ha Your issue here is with the certificate being used to perform the clientAuth action. Your certificate would also not work if you had a multi-node cluster. It is only working as single node cluster because there are no other nodes for which your single node need to communicate as a client. The keystore requirements for NiFi are as follows: 1. keystore MUST contain ONLY one PrivateKeyEntry 2. PrivateKeyEntry MUST have both clientAuth and ServerAuth ExtendedKeyUsage (EKU) 3. PrivateKeyEntry MUST have a SubjectAlternativeName (SAN) entry that matches the NiFi node's server hostname. If you are also going to be addressing your server by its IP, you should have that IP as a SAN entry as well. Any other alternative hostname this server may be known as (meaning user type that alternate hostname in a URL to reach this host) should also be added to SAN. In your case, the current issue happens in the mutual TLS handshake. You have configured your SiteToSiteBulletinReportingTask to send to https://<some ip>/nifi. The same NiFi server receive that client hello and responds with a server hello back which includes the SAN entries. In your case the client (reporting task) looks at that server hello and basically rejects the handshake at that point in time. It does this because of what looks like a man-in-the-middle attack. The client tried to reach host <some ip> but instead a host with san <localhost> responded. There is no configuration change you can make in your secured NiFi to get around this. You'll need to get a new certificate meeting that above min criteria I outlined. You'll need t do this also if you ever intend to add more hosts to your NiFi cluster. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-21-2022
01:25 PM
@DGaboleiro I am a bit confused by yoru dataflow design. In a NiFi multi-node cluster, each node is only aware of and can only execute upon FlowFiles present on that one node. So in your Dataflow you have the QueryCasandra processor executing on "primary node" only as you should (having it execute on all nodes would result in both your nodes performing same query and returning same data). You then Split that Json and use a DistributeLoad processor for what appears to me as means to then send some FlowFIle to node 1 and other half to node 2. This is not the best way to do this. You are running Apache NiFi 1.17 which means that load balanced connections are possible that can accomplish the same without all these additional processors. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#settings After your FlowFiles (this is what is being moved from processor to processor on your canvas) have been distributed I see that you are a MergeContent processor. The MergeContent processor can only merge the FlowFiles present on the same node. It will not merge FlowFiles from multiple nodes to a single FlowFile. So if your desire is to have one merge of all FlowFiles, distributing them across multiple nodes will not give you that desired outcome. You should never configure any processor that accepts an inbound connection for "primary node" only execution. This is important since which node is elected as primary node can change at anytime. Execution strategy has nothing to do with the availability of FlowFiles on each node on which to execute. What is important to understand is that each node in yoru NiFi cluster has its own copy of the Flow, its own set of Content and FlowFile repositories contain unique data, and each nodes executes the processors in its flow with no regard of the existence of other nodes. A node is simply aware from Zookeeper if it has been elected as the cluster coordinator and/or primary node. If it is elected primary node, it will execute "primary node" and "all nodes" components. If it is not the primary node, it will only execute the "all nodes" components. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-21-2022
12:56 PM
@rangareddyy What is important to understand is that the NiFi component processors are not being executed by the user authenticated (assuming secured NiFi) into NiFi, but rather by the NiFi service user. So let's say that your NiFi service is owned by a "nifiservice" linux account. Whatever umask is configured for that user will be applied to directories and files create by that user. Now if your script is using sudo, it is changing the user that executes your script resulting in different user ownership and permission from the "nifiservice" user. Subsequent component processors will also execute as the "nifiservice" user and then not have access to those files and directories. So you'll need to take this in to account as you built your scripts. Make sure that your scripts are adjusting permissions on the directory tree and files as needed so your "nifiservice" user or all users can access the files needed downstream in your dataflows. So in yoru case it sounds like your script executed by ExecuteScript processor is creating a sh file not owned by the "nifiservice" user or does not have execute permission set on it. The ExecuteStreamCommand processor will attempt to execute the sh command on disk as the "nifiservice" user only. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-21-2022
12:41 PM
1 Kudo
@Jagapriyan As a daily job, i may suggest you tackle this differently. You know your source files are written between 8am - 9am each day. So i would configure your listSFTP to run on a cron schedule so it runs every second from 9am-10am to make sure all files are listed. Then knowing that your files may number 90+ (unknown on max) , I would configure your "Min Num of Entries" to some value you know the count will never reach. Make sure "Max Num Entries" is set to a value higher than that. Then configure the "Max Bin Age" to some time 30 mins? What this does is allow MergeContent to continue to allocated FlowFiles to a bin for 30 minutes at which time the bin is forced to merge even if the min value has not be reached. Doing this makes sure you get only one FlowFile out per bin per node. That single FlowFile can then be used to trigger your putEmail used for notification. Additionally, the merged FlowFile will have an attribute "merge.count" added that you can use in your email body to report number of FlowFiles that were ingested. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more