Created 10-24-2022 09:12 AM
I have 2 process that I need to execute on parallelo and I have a 3rd process what I wanto to start when firsts boths process will been finished, so my question is: There is a some box processor that I can use in order to implement this logic or somebody has a some idea to develop it.
Thank a lot and I am waiting for your answers
Best regards
Created 10-24-2022 09:35 AM
@PepeClaro
Your description is vague which makes it difficult to provide suggestions around incorporating them into a dataflow design.
- What are these three "processes"?
- How are those processes being executed? What processors in use for these 3 processes?
- Are there any dependencies between these processes other then order of execution? For example, is output from processes 1 and/or 2 needed by process 3?
- Do processes 1 and 2 need to be executed in parallel?
- Is your NiFi a multi-node cluster?
- What are the triggers for these processes? Does it require a NiFi FlowFile to trigger each processes? What kicks off this entire process dataflow?
The more detail the better would be helpful.
You may be able to set a fragment identifier, fragment count (2), and fragment index (1 or 2) for the first two process FlowFiles and then merge those fragments into one FlowFile that can trigger the third process. If either fragment is missing it will not merge and thus not trigger the 3 process.
If not needing process 1 and 2 in parallel, then a single dataflows process1 --> process 2 --> process 3 where a failure anywhere along the dataflow prevents execution of next process.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 10-25-2022 07:29 AM
While NiFi supports parallel thread execution, there is no way to guarantee that two threads execute at the exact same time. So one NiFi component processor is unaware of what another NiFi Component processor is doing or when it is executing. Processors that have an inbound connection on them use an inbound connection queued FlowFile as the trigger to perform start execution.
Step 1 is to identify what NiFi component processors can be used to perform/execute your 3 processes:
https://nifi.apache.org/docs.html
I have no idea form your description what your 3 processes do, so I can't make any recommendations on what you can/should use.
Step 2 is deciding how to interconnect these NiFi processor component and preserve data needed for downstream dataflow processing in your third process. When a processor executes the response/return from the execution can result in modification to existing NiFi FlowFile's content, Creation of new FlowFile content, Creation of an entirely new FlowFile, Creation of new FlowFile attributes (key/value pairs), modification of FlowFile attributes, or none of the above depending on the NiFi component processor being used. Since you mention that first 2 processes get info that is needed by process 3, so would need to take that into consideration for process 3. Where is that info go ing to end up (FlowFile Content or FlowFile attributes)? How large is that info returned (does it make sense to put it in to attribute)? Does that returned info need to be modified in any way before process 3?
In your Flow as described, you have two Process Groups (PG), Each of these PGs performs your process 1 and process 2. Each will be executing independent of the other and thus can not guarantee execution at the exact same time. Cron scheduling of a processor can give a better chance of same time execution but still not a guarantee since it only schedules when to request an available thread from the NiFi Max Timer Driven Thread pool. If at time of request all threads are in use, it will execute as soon as thread becomes available.
Now out of these two PGs you will have two FlowFiles that your third process depends on. There is no way to tell a NiFi processor component to pull attributes or content from two different FlowFiles source FlowFiles. So before process 3 you need to combine any needed attributes and or content from the two original FlowFiles into one FlowFile that Process 3 can use. Hard to make a recommendation here since I don't know any details about your 3 processes, what the FlowFiles that are produced by Process 1 and 2 contain in terms of content and attributes, and what content and/or attributes from process 1 and 2 are needed by process 3.
I made suggestion about maybe being able to use the "defragment" merge strategy from the MergeContent processor to combine the FlowFiles from process 1 and process 2, but not enough detail to say or say without needing to do other modification before MergeContent. To "defragment" (combine process 1 fragment with process 2 fragment), the FlowFiles produced by both process 1 and process 2 would need to have the following FlowFile attributes present and set correctly on each:
Name Description
fragment.identifier | Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. |
fragment.index | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. |
fragment.count | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. |
segment.original.filename | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. |
Fragment.identifier, fragment.count, and segment.original.filename need to have same values on both FlowFiles. Fragment.index would be unique.
The result would be one output FlowFile with the FlowFile content of both original process 1 and process 2 FlowFiles which process 3 could the use. Or if process 1 and 2 produce FlowFiles with just FlowFile Attributes you need and not content, you could set "Keep All Unique Attributes" as the attribute strategy so that the 1 merged FlowFile has all unique attributes form both source FlowFiles for process 3 to use.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 10-24-2022 09:35 AM
@PepeClaro
Your description is vague which makes it difficult to provide suggestions around incorporating them into a dataflow design.
- What are these three "processes"?
- How are those processes being executed? What processors in use for these 3 processes?
- Are there any dependencies between these processes other then order of execution? For example, is output from processes 1 and/or 2 needed by process 3?
- Do processes 1 and 2 need to be executed in parallel?
- Is your NiFi a multi-node cluster?
- What are the triggers for these processes? Does it require a NiFi FlowFile to trigger each processes? What kicks off this entire process dataflow?
The more detail the better would be helpful.
You may be able to set a fragment identifier, fragment count (2), and fragment index (1 or 2) for the first two process FlowFiles and then merge those fragments into one FlowFile that can trigger the third process. If either fragment is missing it will not merge and thus not trigger the 3 process.
If not needing process 1 and 2 in parallel, then a single dataflows process1 --> process 2 --> process 3 where a failure anywhere along the dataflow prevents execution of next process.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 10-24-2022 10:40 AM
Thank for you answer.
I am going to try to answer for each question.
- The 2 firts processes get information of different source that the 3rd take to analize and to make another operations
- The 2 firts processes recieve the same input on the same time for example ID client. I dont understand your next question.
- Yes, exactly
- Yes.
- No.
- The 1st and 2st processes make different task and they are in the different process group and like I said they recieve the same input and their output is the input of 3rd process. The process begin with of read of file that it containts the id of clients.
Maybe you have some example or flow about the fragment that I can follow it and how I can configurate it
Thanks a lot.
Created 10-25-2022 07:29 AM
While NiFi supports parallel thread execution, there is no way to guarantee that two threads execute at the exact same time. So one NiFi component processor is unaware of what another NiFi Component processor is doing or when it is executing. Processors that have an inbound connection on them use an inbound connection queued FlowFile as the trigger to perform start execution.
Step 1 is to identify what NiFi component processors can be used to perform/execute your 3 processes:
https://nifi.apache.org/docs.html
I have no idea form your description what your 3 processes do, so I can't make any recommendations on what you can/should use.
Step 2 is deciding how to interconnect these NiFi processor component and preserve data needed for downstream dataflow processing in your third process. When a processor executes the response/return from the execution can result in modification to existing NiFi FlowFile's content, Creation of new FlowFile content, Creation of an entirely new FlowFile, Creation of new FlowFile attributes (key/value pairs), modification of FlowFile attributes, or none of the above depending on the NiFi component processor being used. Since you mention that first 2 processes get info that is needed by process 3, so would need to take that into consideration for process 3. Where is that info go ing to end up (FlowFile Content or FlowFile attributes)? How large is that info returned (does it make sense to put it in to attribute)? Does that returned info need to be modified in any way before process 3?
In your Flow as described, you have two Process Groups (PG), Each of these PGs performs your process 1 and process 2. Each will be executing independent of the other and thus can not guarantee execution at the exact same time. Cron scheduling of a processor can give a better chance of same time execution but still not a guarantee since it only schedules when to request an available thread from the NiFi Max Timer Driven Thread pool. If at time of request all threads are in use, it will execute as soon as thread becomes available.
Now out of these two PGs you will have two FlowFiles that your third process depends on. There is no way to tell a NiFi processor component to pull attributes or content from two different FlowFiles source FlowFiles. So before process 3 you need to combine any needed attributes and or content from the two original FlowFiles into one FlowFile that Process 3 can use. Hard to make a recommendation here since I don't know any details about your 3 processes, what the FlowFiles that are produced by Process 1 and 2 contain in terms of content and attributes, and what content and/or attributes from process 1 and 2 are needed by process 3.
I made suggestion about maybe being able to use the "defragment" merge strategy from the MergeContent processor to combine the FlowFiles from process 1 and process 2, but not enough detail to say or say without needing to do other modification before MergeContent. To "defragment" (combine process 1 fragment with process 2 fragment), the FlowFiles produced by both process 1 and process 2 would need to have the following FlowFile attributes present and set correctly on each:
Name Description
fragment.identifier | Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. |
fragment.index | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. |
fragment.count | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. |
segment.original.filename | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. |
Fragment.identifier, fragment.count, and segment.original.filename need to have same values on both FlowFiles. Fragment.index would be unique.
The result would be one output FlowFile with the FlowFile content of both original process 1 and process 2 FlowFiles which process 3 could the use. Or if process 1 and 2 produce FlowFiles with just FlowFile Attributes you need and not content, you could set "Keep All Unique Attributes" as the attribute strategy so that the 1 merged FlowFile has all unique attributes form both source FlowFiles for process 3 to use.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 10-25-2022 08:24 AM
Matt, Thanks a lot for your explain, your time and recommendation.
I applied in my flow the processor MergeContent and I could resolve my problem. With this processor, the 3rd process dosent run and waits until the first boths has fineshed and it was the main idea of my question.
One more time, Thanks
Pepe