Member since
07-30-2019
2440
Posts
1284
Kudos Received
689
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
216 | 01-17-2023 12:55 PM | |
80 | 01-12-2023 01:30 PM | |
123 | 01-12-2023 12:52 PM | |
109 | 12-20-2022 12:06 PM | |
332 | 12-16-2022 08:53 AM |
12-09-2022
01:28 PM
@Onkar_Gagre Let's take a look at concurrent task here.... You have a an 8 core machine. You have a ConsumeKafka configured with 8 concurrent tasks and 4 nodes. I hope this means your Kafka topic has 32 partitions because that processor creates a consumer group with the 8 consumers from each node as part of that consumer group. Kafka will only assign one consumer from a consumer group to 1 partition. So having more consumer then partitions gains you nothin, but can cause performance issues caused by rebalance. Then you have a QueryRecord with 40 Concurrent tasks per node. Each allocated thread across your entire Dataflow needs time on the CPU. So just between these two processor alone, you are scheduling up to 48 concurrent threads that must be handled by only 8 cores. Based on your description of data volume, it sounds like a lot of CPU wait when enable this processor as each thread is only get a fraction of time on the CPU and thus taking long to complete its task. Sounds like you need more Cores to handle your dataflow and not necessarily an issue specific to the use of the QueryRecord processor. While you maybe scheduling concurrent tasks too high for your system on the QueryRecord processor, The scheduled thread come from the Max Timer Driven Thread pool set in yoru NiFi. The default is 10 and I assume you increased this higher to accommodate the concurrent tasks you have been assigning to your individual processors. The general starting recommendation for the Max Timer Driven Thread pool setting is 2 to 4 Times the number of cores on your node. So with an 8 core machine that recommendation would be 16 - 32. The decision/ability to set that even higher is all about your dataflow behavior along with your data volumes. It requires you to monitor cpu usage ad adjust the pool size in small increments. Once CPU is maxed there is nothing much we can do with create more CPU. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-09-2022
01:09 PM
@Techie123 Can you provide more detail around your requirement for "t he FFs order is also important". My initial thought here would be a two phase merge. In the first Merge you utilize a correlation FlowFile attribute you create on each FlowFile based off the employees ID extracted from the record. Setting min number of entries to 7 and max to 10. Then you take these employee merged records and merge them together in to larger FlowFiles using MergeRecord. The question is if 100 records per FlowFile is a hard limit or not which it does not. The MergeRecord processor Max number of records is soft limit. Let's assume we set this to 100. So lets say one of your merged employee records comes to the MergeRecord and has 7 records in it for that employee ID, yet the bin already has 98 records in it. Since bin min has not been met yet, this merged FlowFile still gets added and results in merged FlowFile with 105 records. If you must keep it under 100 records per FlowFile set the max records to 94. If at end of adding a set of merged employee records it is less than 94 another merge employee record would be added and since you stated each set of merged employee records could be up to 7, this keeps you below or at 100 in that single merged record. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-06-2022
12:48 PM
@Onkar_Gagre 1. What is the CPU and Memory usage of your NiFi instances when the QueryRecord processor is stopped? 2. How is your QueryRecord processor configured to include scheduling and concurrent task configurations? What other processors were introduced as part of this new dataflow? 3. What does disk I/O look like while this processor is running? NiFi documentation does not mention any CPU or Memory specific resource considerations when using this processor. Thanks, Matt
... View more
12-05-2022
11:33 AM
1 Kudo
@Ghilani NiFi stores Templates in the flow.xml.gz file. the flow.xml.gz is just a compressed copy of dataflow(s) which reside inside NiFi's heap memory while NiFi is running. It is not recommended to keep templates in your NiFi. NiFi templates are also deprecated and will go away in next major release. It is recommended to use NiFi-registry to store version controlled flows. If not using NiFi-Registry, Flow definitions should be downloaded instead of creating templates and stored safely somewhere outside of NiFi itself. A flow definition can be downloaded by right clicking on a process group in NiFi and selecting "Download flow definition". This json file will be generated of that flow and downloaded. Flow definitions can be uploaded to NiFi by dragging the create Process Group icon to the canvas and selecting option to upload flow definition. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
11:19 AM
1 Kudo
@dreaminz You can create variables on a process group, those variables are then only available to that process group (scope) on which they were created. NiFi documentation on Variables: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#Variables Variable shave been deprecated in favor of Parameter Contexts: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#parameter-contexts You can create a single parameter context that you add parameters to and then associate the parameter context with multiple process groups. This will allow you to update a parameter in one parameter context and effectively update your flows in multiple process groups. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
11:09 AM
@grb Your QueryDatabaseTable processor is failing because the dependent controller service is not yet enabled. It appears that controller services is still trying to enable (enabling) because the SQLServerDriver you have configured in that controller service is not compatible with the Java JDK version you are using to run NiFi. What version of NiFi are you using? What version Java is your NiFi using? I recommend updating your Java version to the most recent version of Java JDK 8 or Java JDK 11 (Version 11 only supported in NiFi versions 1.10+). Otherwise, you'll need to find an older version of your SQ driver. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
10:56 AM
@ajignacio That was a big jump from version 1.9.x to 1.16.x of NiFi. NiFi's data provenance stores, for a configurable amount of time, information about NiFi FlowFiles as they traverse the various processors in your dataflow(s). Over the releases of NiFi both improvements and new implementations of provenance have been introduced. The original version of provenance was org.apache.nifi.provenance.PersistentProvenanceRepository which has since been deprecated in favor of a better performing provider class org.apache.nifi.provenance.WriteAheadProvenanceRepository which is the new default. The following properties from the nifi.properties file are used to configure the provenance repository: nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
nifi.provenance.repository.directory.default=./provenance_repository
nifi.provenance.repository.max.storage.time=30 days
nifi.provenance.repository.max.storage.size=10 GB. (use to be 1 GB)
nifi.provenance.repository.rollover.size=100 MB
nifi.provenance.repository.query.threads=2
nifi.provenance.repository.index.threads=2
nifi.provenance.repository.compress.on.rollover=true
nifi.provenance.repository.always.sync=false
nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID
nifi.provenance.repository.indexed.attributes=
nifi.provenance.repository.index.shard.size=100 MB
nifi.provenance.repository.max.attribute.length=65536
nifi.provenance.repository.concurrent.merge.threads=2
nifi.provenance.repository.warm.cache.frequency= For details on these properties, here is Apache NiFi documentation section: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository The good news is that data provenance retention has no direct relationship to the active FlowFiles traversing your dataflow(s) currently. This means that you can shutdown your NiFi, purge the contents of the current <path to>/provenance_repository directory, adjust the configuration properties as you want, and then restart your NiFi. NiFi will build a new provenance repository on startup. Considering that NiFi only provides limited configurable space (1GB original default to 10GB current default) and age (30 days) as the defaults, you would not be losing much if you were to reset. I am also concerned that the path in the error suggests you also created your original provenance_repository within a subdirectory of the FlowFile_repository which I would not recommend. I would strongly suggest not writing the contents of any one of the four NiFi repositories within each other. Considering the flowfile_repository and content_repository are the two most important repositories for tracking your actively being processed FlowFiles in your dataflow(s), I suggest these each be on their own path and reside on dedicated disk backed by RAID to avoid data loss in the event of a disk failure. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
10:25 AM
@Sinchan You'll want to inspect the configuration of the the following properties in the nifi.properties configuration file: When you configure a secure NiFi configuration, these properties must be configured. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
09:16 AM
@Vinylal You can download the Cloudera Manager installer from the following Cloudera page: https://www.cloudera.com/downloads/cdp-private-cloud.html You'll need Cloudera username and password in order to access downloads from Cloudera. If you account with Cloudera and don't know your credentials, you can reach out to your Cloudera account representative. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
12-05-2022
09:09 AM
@Jacccs Was the nifi-app.log created? If so, what was observed in that log? Please share the exact versions of NiFi you used. Have you tried using a Java JDK instead of Java JRE? Thanks, Matt
... View more
12-05-2022
08:46 AM
@cihankara I recommend starting a new community question as this sounds like a different use case from this community post. Providing details of yoru use case like what is other output, how are those being ingested in to NiFi, what info is available in that output, etc.. Feel free to @MattWho in that new community post to trigger notification to me in the community. Thanks, Matt
... View more
12-05-2022
08:41 AM
@hargav Please create a new community question for your queries around MergeRecord processor. This is the best way to get attention and best for community to have a separate thread for each specific query. I am not clear on your use case for using "cron driven" scheduling with the MergeRecord. This would not be a common thing to do. Best to explain your use case in a new community thread along with sharing your MergeRecord processor configuration. Feel free to @MattWho in the new community post to notify me. Thanks, Matt
... View more
11-28-2022
12:25 PM
@hargav NiFi processor scheduled every day at 11:40AM: 0 40 11 * * ? * NiFi processor Scheduled every day at 12:00PM: 0 0 12 * * ? * If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-28-2022
12:01 PM
@Mohamed_Shaaban I recommend starting a new community question with the details specific to your setup. This allows the community to address/assist with your specific setup versus comparing your issue to what was shared in this post. Thanks, Matt
... View more
11-22-2022
01:28 PM
2 Kudos
@drewski7 New processors are created within the community all the time and the documentation for processors should include resource considerations for CPU usage and Memory usage. Just because a processor list memory as a resource consideration, that impact is often a byproduct of how that processor has been configured. you can refer to the imbedded documentation in your installed NiFi instance or you can right click on a processor added to the canvas and select "view usage" from the displayed context menu to go directly to that components embedded documentation page. But processors like ReplaceText, SplitText, SplitContent, SplitJson,... would be examples. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-22-2022
01:21 PM
@stupidnickname
All nodes in a NiFi cluster execute their own copy of the flow.xml.gz. I order for nodes to be members of the same NiFi cluster, all nodes must be running the flow.xml.gz file with same exact flow contents. Each node in a NiFi cluster loads its FlowFiles (FlowFiles are what traverse between one processor component to another and consist of metadata and content) and is unaware of the FlowFiles being processed by other nodes in a NiFi cluster. In a NiFi cluster there is the concept of a primary node. Some processors can be configured to execute on the primary node only. If the primary elected node goes down , a new node will be elected as primary node and those primary node only configured processors will start executing on that newly elected primary node. Perhaps more details around your query could help provide a more specific response if this does not answer your question.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you,
Matt
... View more
11-22-2022
01:10 PM
@hargav It is not possible to create single Quartz cron that will schedule at 12:00:00 PM and 11:40:00 AM each day. However, the following would execute at 11:40:00AM and 12:40:00PM every day 0 40 11,12 * * ? and following would execute at 11:00:00AM, 11:40:00AM, 12:00:00PM, and 12:40:00PM 0 0,40 11,12 * * ? or 0 0/40 11,12 * * ? Hope this helps you, Matt
... View more
11-22-2022
01:01 PM
@zack_riesland Yes, "0 0 18 * * ?" means schedule to run at 0 secs, 0 mins, 18 hour, every day, every month, any day of week" What's important to understand is the quartz cron is used to schedule the processor to execute. In order for the processor to execute at exactly 18:00:00, NiFi must have an available thread in the NiFi timer driven thread pool in order to execute. If a thread is not available, the processors code will execute as soon as a thread become available. Since it has been "scheduled" it will run as soon as thread becomes available and then will get scheduled again the next day at 18:00:00. Matt
... View more
11-15-2022
07:51 AM
@Jacccs An example or detailed description of yoru use case may be helpful in providing the bets guidance for you. While the NiFi Expression Language (NEL) function anyMatchingAttribute expects a java regular expression that searches and returns values for multiple FlowFile attributes, that does not appear to be what you need??? Your attribute "attributeToSearch" implies only a single specific FlowFile attribute is desire to be checked if it contains some "${value}". If this is correct, you would be able to use the following NEL: ${literal("${${attributeToSearch}}"):contains('${value}')} For above NEL, let's assume a FlowFile with attribute "attributeToSearch" set to "username". A FlowFile attribute "username" set to "admin-matt". A FlowFile attribute "value" set to "admin". The result of above NEL statement would be true ${$attributeToSearch}} would first resolve to ${username} which would then resolve to "admin-matt". That "admin-matt" string would be then passed to the NEL contains function which will to check to see if that string contains the string "admin" within it. The result is a boolean "true" or "false". If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-08-2022
12:01 PM
@hegdemahendra This is not something I have tried before, but... When you execute the nifi.sh script to start it bootstrap the NiFi process via the configuration in the bootstrap.conf NiFi configuration file. It is during the bootstrap process that NiFi starts the main child process that loads NiFi. Perhaps you can add additional java.args to handle your pre NiFi needs? Or maybe modify the the nifi.sh script itself so that is executes your requirements before calling the rest of the NiFi startup process. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-08-2022
11:55 AM
@D5ha Your issue is a mutual TLS handshake issue and really has nothing specific to do with NiFi itself. There are a lot of resources on the web for creating certificates. There are even free services like Tinycert you can use to generate valid certificate meeting the requirements I shared in my last response. Providing guidance on how to create certificates does not make much sense since it can be done so many ways: - Self-signed - public CA - Corporate/private CA etc. Your current shared TLS exception is telling you that the IP or Hostname (you have BLUE line through it in yoru image) was not found as a Subject Alternative Name (SAN) in the certificate created for the server side of this handshake which in yoru case happens to also be your NiFi instance. The Site-To-Site-Bulletin-Reporting-Task is acting as the client in this Mutual TLS handshake and the NiFi server S2S destination URL is the server side of this TLS handshake. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-08-2022
11:43 AM
1 Kudo
@Bridewin I two things you may want to try.... 1. GetFile processor was deprecated in favor of the newer ListFile --> FetchFile processors. I'd recommend switching to these processors and see if you have the same observations. 2. I'd suggest enabling debug logging for the GetFile processor class to see what additional logging may show. To do this, you would modify the logback.xml file in NiFi's conf directory. Add the below line down in this file where you see similar lines already. <logger name="org.apache.nifi.processors.standard.GetFile" level="DEBUG"/> If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-08-2022
11:34 AM
@Jagapriyan I suspect issue with last modified timestamps since missed files have older last modified timestamp than what was already consumed from the target directory that is compounded by the sub-directory structure. My recommendation is switch to using the listing strategy "Tracking Entities" instead. Tracking Entities will keep track of filenames and timestamps so even an older timestamped file will get consumed if its filename is not in the tracked entities list stored in the distributed cache. Let me know is making this change resolves yoru issue. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-02-2022
10:28 AM
@Bridewin To add some additional context around your Cron schedule. NiFi uses Quartz Cron in case you were not already aware. Your current Quartz cron "0 05 8 1/1 * ? *" means that the processor will be scheduled to execute at 8:05am starting day 1 of every month and every subsequent day after day 1 in each month. The issue with this cron is when you start your GetFile on any day other than the 1st prior to 8:05am. Let's say you start NiFi on November 3rd. On startup NiFi loads your flow and starts all your component processors. In this configuration your GetFile will not get scheduled until December 1st and then at that point continue to execute everyday there after. If you stop and start the processor even without a NiFi restart, the same would happen. If NiFi restarts the JVM, same will happen. I am not clear on why you decided to add 1/1, perhaps this is how you intended for it to be scheduled? To truly have it get scheduled at 8:05am everyday starting the very day the processor is started (whether via user action or NiFi JVM restart), you would want a cron like "0 5 8 * * ? *" For more info on QuartZ Cron, review this link: https://productresources.collibra.com/docs/collibra/latest/Content/Cron/co_quartz-cron-syntax.htm If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-01-2022
12:36 PM
@Jagapriyan Since you are using the Listing Strategy "Tracking Timestamps", the configuration property "Entity Tracking Time Window" is not used. The "Tracking Timestamps" strategy is very dependent on timestamps of the target files. Typically when files are not being picked up it is because the timestamps on those files are equal to or less than the last recorded timestamp in the ListSFTP processors state. This can happen when files in the SFTP server target folders do not have their last modified timestamp updated (for example moving a file from another directory into a SFTP server directory. A copy would update the timestamp since the file is being written again). - Does your target SFTP path have multiple sub-directories which are being searched? Is Search Recursively set to "true"? - Are there symlink directories in use? - Have you looked the the state recorded timestamp for your SFTP server directories? Are your missed files having older timestamps? - How many files average are being written to the target SFTP between 12am and 1am each day? I also see you have min file age of 5 minutes. This means the last Modified timestamp must be 5 minutes older than the execution time of your processor for the file to be eligible for consumption. I see you stated your files are placed in the SFTP server between 12am - 1am each day and you scheduled your ListSFTP processor using a cron schedule at 10 minutes and 1 second every hour between 2am and 2pm. Why not just have your listSFTP processor run all the time? Is this because timestamps are not being updated consistently? If you switch to using the listing strategy "Tracking Entities" instead, do you still see the issue? Tracking entities works when there is issues with timestamps and was developed for that reason. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
11-01-2022
12:02 PM
@Bridewin Are all your environments using NAS storage location from which the GetFile is pulling files? Have you monitored the health and connectivity of your NAS. Since you have your GetFile only scheduled to execute once a day, if your NAS or network is having issues, it simply will return nothing for that days execution. Since you are configured to remove the file you are consuming, have you tried to change yoru cron to run multiple times within the 8am hour to see if it gets picked up by any one of those executions? Perhaps if you are having network issues occasionally impacting your NAS, this will resolve your issue with consuming the file. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-28-2022
01:06 PM
@D5ha Not all processors write to the content repository nor is content of a FlowFile ever modified in the content after it is created. Once a FlowFile is created in NiFi it exists as is until terminated. A NiFi FlowFile consists of two parts, FlowFile Attributes (metatadata about the FlowFile which includes details about the FlowFIle's content location in the content_repository) and the FlowFile content itself. When a downstream processor modifies the content of a FlowFile, what is really happening is a new content is written to a new content claim in the content_repository, the original content still remains unchanged. From what you shared, you appear to have just one content_repository. Within that single content_repository, NiFi creates a bunch of sub-directories. NiFi does this because of the massive number of content claims a user's dataflow(s) may hold for better indexing and seeking. What is very important to also understand is that a content claim in the content_repository can hold the content for 1 or more FlowFiles. It is not always one content claim per FlowFiles content. It is also very possible to have multiple queued FlowFiles pointing to the exact same content claim and offset (same exact content). This happens when you dataflow clones a FlowFile (for example routing same outbound relationship from a processor multiple times). So you should never manually delete claims from any content repository as you may delete content for multiple FlowFiles. That being said, you can use data provenance to locate the content_repository (container), subdirectory (section), Content Claim filename(Identifier), Content offset byte where content begins in that claim (Offset), and number of bytes from offset to end of content in the claim (Size). Right click on a processor and select "view data provenance" from displayed context menu: This will list all FlowFiles for which provenance still holds index data on that were processed by this processor: Click the Show Lineage icon (looks like 3 connected circles) to the far right of a FlowFile. You can right click on "clone" and "join" events to find/expand any parent flowfiles in the lineage (the event dot created for the processor on which you said show provenance will be colored red in the lineage graph): Each white circle is a different FlowFile. clicking on a white circle will highlight dataflow path for that FlowFile. Right clicking on an event like "create" and selecting "view details" will tell you all about what is known about that FlowFile (this includes a tab about the "content"): Container corresponds to the following property in the nifi.properties file: nifi.content.repository.directory.default= Section corresponds to subdirectory within the above content repository path. Identifier is the content claim filename. Offset is the byte on which content for this FlowFile begins within that identifier. Size is number of bytes of you reach end of content for that FlowFile's content in the Identifier. I also created an article on how to index the Content Identifier. Indexing a field allows you to locate a content claim and the search for it in your data provenance to find all FlowFile(s) that pointed at it. You can then look view the details of all those FlowFile(s) to see full content calim details as above: https://community.cloudera.com/t5/Community-Articles/How-to-determine-which-FlowFiles-are-associated-to-the-same/ta-p/249185 If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-25-2022
07:29 AM
@PepeClaro While NiFi supports parallel thread execution, there is no way to guarantee that two threads execute at the exact same time. So one NiFi component processor is unaware of what another NiFi Component processor is doing or when it is executing. Processors that have an inbound connection on them use an inbound connection queued FlowFile as the trigger to perform start execution. Step 1 is to identify what NiFi component processors can be used to perform/execute your 3 processes: https://nifi.apache.org/docs.html I have no idea form your description what your 3 processes do, so I can't make any recommendations on what you can/should use. Step 2 is deciding how to interconnect these NiFi processor component and preserve data needed for downstream dataflow processing in your third process. When a processor executes the response/return from the execution can result in modification to existing NiFi FlowFile's content, Creation of new FlowFile content, Creation of an entirely new FlowFile, Creation of new FlowFile attributes (key/value pairs), modification of FlowFile attributes, or none of the above depending on the NiFi component processor being used. Since you mention that first 2 processes get info that is needed by process 3, so would need to take that into consideration for process 3. Where is that info go ing to end up (FlowFile Content or FlowFile attributes)? How large is that info returned (does it make sense to put it in to attribute)? Does that returned info need to be modified in any way before process 3? In your Flow as described, you have two Process Groups (PG), Each of these PGs performs your process 1 and process 2. Each will be executing independent of the other and thus can not guarantee execution at the exact same time. Cron scheduling of a processor can give a better chance of same time execution but still not a guarantee since it only schedules when to request an available thread from the NiFi Max Timer Driven Thread pool. If at time of request all threads are in use, it will execute as soon as thread becomes available. Now out of these two PGs you will have two FlowFiles that your third process depends on. There is no way to tell a NiFi processor component to pull attributes or content from two different FlowFiles source FlowFiles. So before process 3 you need to combine any needed attributes and or content from the two original FlowFiles into one FlowFile that Process 3 can use. Hard to make a recommendation here since I don't know any details about your 3 processes, what the FlowFiles that are produced by Process 1 and 2 contain in terms of content and attributes, and what content and/or attributes from process 1 and 2 are needed by process 3. I made suggestion about maybe being able to use the "defragment" merge strategy from the MergeContent processor to combine the FlowFiles from process 1 and process 2, but not enough detail to say or say without needing to do other modification before MergeContent. To "defragment" (combine process 1 fragment with process 2 fragment), the FlowFiles produced by both process 1 and process 2 would need to have the following FlowFile attributes present and set correctly on each: Name Description fragment.identifier Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. fragment.index Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. fragment.count Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. segment.original.filename Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. Fragment.identifier, fragment.count, and segment.original.filename need to have same values on both FlowFiles. Fragment.index would be unique. The result would be one output FlowFile with the FlowFile content of both original process 1 and process 2 FlowFiles which process 3 could the use. Or if process 1 and 2 produce FlowFiles with just FlowFile Attributes you need and not content, you could set " Keep All Unique Attributes" as the attribute strategy so that the 1 merged FlowFile has all unique attributes form both source FlowFiles for process 3 to use. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:49 AM
@dubrovski Rather than using ExecuteStreamCommand processor to execute Curl, have you tried using the invokeHTTP processor instead for your PUT operation? If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-24-2022
09:35 AM
@PepeClaro Your description is vague which makes it difficult to provide suggestions around incorporating them into a dataflow design. - What are these three "processes"? - How are those processes being executed? What processors in use for these 3 processes? - Are there any dependencies between these processes other then order of execution? For example, is output from processes 1 and/or 2 needed by process 3? - Do processes 1 and 2 need to be executed in parallel? - Is your NiFi a multi-node cluster? - What are the triggers for these processes? Does it require a NiFi FlowFile to trigger each processes? What kicks off this entire process dataflow? The more detail the better would be helpful. You may be able to set a fragment identifier, fragment count (2), and fragment index (1 or 2) for the first two process FlowFiles and then merge those fragments into one FlowFile that can trigger the third process. If either fragment is missing it will not merge and thus not trigger the 3 process. If not needing process 1 and 2 in parallel, then a single dataflows process1 --> process 2 --> process 3 where a failure anywhere along the dataflow prevents execution of next process. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more