Member since
11-03-2023
32
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3931 | 11-08-2023 09:29 AM |
10-20-2025
06:29 AM
@Rohit1997jio Yes it can be fast, but that depends on your dataflow(s). How many other dataflows is your NiFi also running? All these dataflows share the same JVM resources and utilize server resources. Also concurrency is important so having a match between number of partitions in your source Topic A and the concurrency within your ConsumeKafkaRecord processor is important. Concurrent tasks in the ConsumeKafkaRecord you be set using the following formula: (num partitions in Topic A) / (num NiFi nodes in cluster) = (num concurrent tasks set on ConsumeKafkaRecord) Example: Kafka Topic partions = 12 partitions NiFI cluster num nodes = 3 Concurrent tasks set on ConsumeKafkaRecord = 4 4 X 3 = 12 consumers in the consumer group (1 consumer per partition in source topic) Also in NiFI keep in mind that each concurrent task utilizes a thread from NiFi's "Max Timer Driven Thread Count" thread pool. The default pool size is 10. This means that only 10 concurrent task can execute at the same time. Generally speaking threads milliseconds, but for optimal performance you'll want to mange your server CPU load average resources and set the pool higher to maximize your throughput performance. NiFi can show you the Core Load Average (1 = 100% utilization of one physical core). So if your NiFi server has 4 cores and the load average is 4 or higher, the CPU is saturated. You'll be able to use this Core Load Average data to determine how much your can increase the size of the "Max timer Driven Thread Count" pool setting. NiFi recommends setting your "Max Timer Driven Count" initially to 2 - 4 times the number of Physical cores on the NiFi node. This assume all nodes in the NiFi cluster have same number of physical cores. If nodes have various numbers of physical cores, use the nodes with the fewest to set your initial pool size. Then monitor core load average across all NiFi nodes and adjust accordingly. There is no way to configure different thread pools per node in a NiFi cluster, NiFi expects all all servers in a NiFi cluster to have same hardware configuration. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-03-2025
06:14 AM
@Rohit1997jio https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html Your quartz cron "0-30 */6 * * * ?" translates to: Execute every second from 0 - 30 seconds 6 minutes after every minute of every hour ... I think your issue is using */6 because you are saying 6 minutes after every minute which is effectively the same thing as having just * in the minutes field. If you change this to 0/6, the processor would get scheduled 0/6/12/18/24/30/36/42/48/54 every hour. If you want it to start at 6 minutes you would use 6/6 which would schedule processor at 6/12/18/24/30/36/42/48/54 every hour (you would however have a 12 minute gap between end of each hour and 6 minutes of next hour with this config). Also keep in mind that Scheduling does not necessarily mean execution at same time. NiFi has a Max timer driven thread pool from which threads are given out to scheduled processors. With very large flows or processors with long running threads, scheduled processor may need to wait for a thread to become available to actually execute. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-23-2024
01:16 AM
1 Kudo
Hi @MattWho Thanks for your response . I also don't understand the overhead of ingesting the same messages twice in your NiFi My requirement is to send data to different end point , so that they can perform different operations on the data. Why not have have a single ConsumeKafka ingesting the messages from the topic and then routing the success relationship from the ConsumeKafka twice (once to InvokeHTTP A and once to InvokeHTTP B)? For me one flow is like one vendor , like this i will be having multiple number of vendors , everyone will have there separate end points. Keeping all in one flow is not possible . So I am creating separate data flow and separate retry logic for them. This above issue is with only 1 vendor , they require same data (consumed from same kafka topic ) to be pushed to 2 separate endpoints . But I am not able to handle the retry logic for them. Why publish failed or retry FlowFile messages to an external topic R just so they can be consumed back in to your NiFi? yes i want them to be consumed again to nifi . All failed requests iam publishing to retry topic and this is being handled in retry flow . With this iam able to keep my main flow without and failed requests and new requests which does not have any error will get pushed to end point successfully It would be more efficient to just keep them in NiFi and create a retry loop on each InvokeHTTP. NiFi even offers retry handling directly on the relationships with in the processor configuration if i add a retry loop to invoke http and the endpoint is down for a longer time , too many requests will get queued in nifi . If you must write the message out to just one topic R, you'll need to append something to the message that indicates what InvokeHTTP (A or B) failure or retry resulted in it being written to Topic R. Then have a single Retry dataflow that consume from Topic R, extracts that A or B identifier from message so that it can be routed to the correct invokeHTTP. Just seems like a lot of unnecessary overhead. Please help me with the retry logic . Data is going in same retry topic how can i differentiate between the data , whether it failed from data flow 1 or from data flow 2.
... View more
07-17-2024
07:22 AM
Did you find any solution about this issue ? I have exactly the same problem. An invoke HTTP get a time out on the flow file. A retry is implemented and the flow file get stucked with a timeout each time. Works fine with same configuration to call API with postman and a curl from server nifi node. The only way is to restart the invoke http processor and after few hours and some flow files a new occurence appears. Error : 2024-07-17 14:55:06,783 ERROR [Timer-Driven Process Thread-17] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=ba9d2024-8df1-323d-b0ba-54402272b188] Routing to Failure due to exception: java.net.SocketTimeoutException: timeout: java.net.SocketTimeoutException: timeout java.net.SocketTimeoutException: timeout at okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593) at okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601) at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.java:510) at okhttp3.internal.http2.Http2Stream$FramingSink.close(Http2Stream.java:538) at okio.RealBufferedSink.close(RealBufferedSink.java:236) at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:63) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at okhttp3.RealCall.execute(RealCall.java:69) at org.apache.nifi.processors.standard.InvokeHTTP.onTrigger(InvokeHTTP.java:850) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks
... View more
12-28-2023
05:23 AM
Looking at the thread, Ans is No, that's not the way NiFi Processor is meant to handle relationships, The Client Processor has to initialize /start properly with the given configuration then only it can try to write and then based on the write results flow files can be routed to failure or success, Here Processor can't even start properly.
... View more
12-22-2023
09:52 AM
@Rohit1997jio, I was going to answer this similar to @MattWho who beat me to it but I will post my answer anyway in case it can help. Basically @MattWho is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code. I'm using groovy code for my script. The process relies on two attributes: 1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0. 2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry. The groovy script: flowFile = session.get()
if(!flowFile) return
// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')
// if its the first time set the value to 0 (no rety yet. first penalize)
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()
// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs
if(totalRetry.toInteger()>3)
{
session.transfer(flowFile, REL_FAILURE)
return
}
// if totalRetry has not passed the threshold and the file is not
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
flowFile = session.penalize(flowFile)
session.transfer(flowFile)
return
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return You can set the Penalize period under processor SETTINGS tab Hope this helps Thanks
... View more
12-21-2023
11:53 AM
I want to retry my flow file if it has some error for 24 hours , retry should will happen every hours . if the file is there in the flow for more then 24 hours it should be removed .
iam trying achieve same by using update attribute processors , routeOnAttribute processor and taking help of expression language . Still iam not able to achieve it.
these are the expression I am using to check elapsed time and received Time of flow file in routeOnAttribute processor but its going to unmatched relation every time -
these values are for testing pursose -
elapsed_time = ${now():toNumber():minus(${queuedtimestamp:toNumber()}):divide(60000):gt(10)}
receivedTime= ${now():toNumber():minus(${receivedTime:toDate("yyyy-MM-dd HH:mm:ss"):toNumber()}):gt(180000)}
NOTE: retryFlowFile processor in not available as my nifi version is old
Please suggest better approach to do so .
... View more
Labels:
- Labels:
-
Apache NiFi
12-21-2023
07:11 AM
1 Kudo
@Rohit1997jio Most NiFi processors that are "started/running" are not continuously consuming threads. The processor configurable scheduling controls how often the component processor requests a thread from the NiFi controller thread pool to execute it's code. By default processors have a run schedule of 0 which means that the processor would get scheduled to run as often as possible. Should execution result in no processing of data (FlowFiles), NiFi controller will delay next scheduled execution of that processor for 1 sec before it can be scheduled again (this logic prevents excessive CPU usage by processor components). The Kafka processors simply execute Kafka Client libraries and support adding dynamic properties to the processors configuration that impact that client library behavior. The Apache NiFi documentation for each component will identify if "Dynamic Properties" are supported: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.24.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html In the case of the consume and publish Kafka component processors, dynamic properties are supported. The dynamic properties are limited to those allowed by the specific kafka client library version: https://kafka.apache.org/documentation.html#configuration If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-12-2023
11:06 PM
disconnect and reconnect again this functionality i want to achieve with Publish kafka processor . ConsumerKafka will consumer continuously .
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more