Member since
11-03-2023
27
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2470 | 11-08-2023 09:29 AM |
09-23-2024
01:16 AM
1 Kudo
Hi @MattWho Thanks for your response . I also don't understand the overhead of ingesting the same messages twice in your NiFi My requirement is to send data to different end point , so that they can perform different operations on the data. Why not have have a single ConsumeKafka ingesting the messages from the topic and then routing the success relationship from the ConsumeKafka twice (once to InvokeHTTP A and once to InvokeHTTP B)? For me one flow is like one vendor , like this i will be having multiple number of vendors , everyone will have there separate end points. Keeping all in one flow is not possible . So I am creating separate data flow and separate retry logic for them. This above issue is with only 1 vendor , they require same data (consumed from same kafka topic ) to be pushed to 2 separate endpoints . But I am not able to handle the retry logic for them. Why publish failed or retry FlowFile messages to an external topic R just so they can be consumed back in to your NiFi? yes i want them to be consumed again to nifi . All failed requests iam publishing to retry topic and this is being handled in retry flow . With this iam able to keep my main flow without and failed requests and new requests which does not have any error will get pushed to end point successfully It would be more efficient to just keep them in NiFi and create a retry loop on each InvokeHTTP. NiFi even offers retry handling directly on the relationships with in the processor configuration if i add a retry loop to invoke http and the endpoint is down for a longer time , too many requests will get queued in nifi . If you must write the message out to just one topic R, you'll need to append something to the message that indicates what InvokeHTTP (A or B) failure or retry resulted in it being written to Topic R. Then have a single Retry dataflow that consume from Topic R, extracts that A or B identifier from message so that it can be routed to the correct invokeHTTP. Just seems like a lot of unnecessary overhead. Please help me with the retry logic . Data is going in same retry topic how can i differentiate between the data , whether it failed from data flow 1 or from data flow 2.
... View more
07-17-2024
07:22 AM
Did you find any solution about this issue ? I have exactly the same problem. An invoke HTTP get a time out on the flow file. A retry is implemented and the flow file get stucked with a timeout each time. Works fine with same configuration to call API with postman and a curl from server nifi node. The only way is to restart the invoke http processor and after few hours and some flow files a new occurence appears. Error : 2024-07-17 14:55:06,783 ERROR [Timer-Driven Process Thread-17] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=ba9d2024-8df1-323d-b0ba-54402272b188] Routing to Failure due to exception: java.net.SocketTimeoutException: timeout: java.net.SocketTimeoutException: timeout java.net.SocketTimeoutException: timeout at okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593) at okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601) at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.java:510) at okhttp3.internal.http2.Http2Stream$FramingSink.close(Http2Stream.java:538) at okio.RealBufferedSink.close(RealBufferedSink.java:236) at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:63) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at okhttp3.RealCall.execute(RealCall.java:69) at org.apache.nifi.processors.standard.InvokeHTTP.onTrigger(InvokeHTTP.java:850) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks
... View more
12-28-2023
05:23 AM
Looking at the thread, Ans is No, that's not the way NiFi Processor is meant to handle relationships, The Client Processor has to initialize /start properly with the given configuration then only it can try to write and then based on the write results flow files can be routed to failure or success, Here Processor can't even start properly.
... View more
12-22-2023
09:52 AM
@Rohit1997jio, I was going to answer this similar to @MattWho who beat me to it but I will post my answer anyway in case it can help. Basically @MattWho is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code. I'm using groovy code for my script. The process relies on two attributes: 1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0. 2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry. The groovy script: flowFile = session.get()
if(!flowFile) return
// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')
// if its the first time set the value to 0 (no rety yet. first penalize)
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()
// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs
if(totalRetry.toInteger()>3)
{
session.transfer(flowFile, REL_FAILURE)
return
}
// if totalRetry has not passed the threshold and the file is not
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
flowFile = session.penalize(flowFile)
session.transfer(flowFile)
return
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return You can set the Penalize period under processor SETTINGS tab Hope this helps Thanks
... View more
12-21-2023
11:53 AM
I want to retry my flow file if it has some error for 24 hours , retry should will happen every hours . if the file is there in the flow for more then 24 hours it should be removed .
iam trying achieve same by using update attribute processors , routeOnAttribute processor and taking help of expression language . Still iam not able to achieve it.
these are the expression I am using to check elapsed time and received Time of flow file in routeOnAttribute processor but its going to unmatched relation every time -
these values are for testing pursose -
elapsed_time = ${now():toNumber():minus(${queuedtimestamp:toNumber()}):divide(60000):gt(10)}
receivedTime= ${now():toNumber():minus(${receivedTime:toDate("yyyy-MM-dd HH:mm:ss"):toNumber()}):gt(180000)}
NOTE: retryFlowFile processor in not available as my nifi version is old
Please suggest better approach to do so .
... View more
Labels:
- Labels:
-
Apache NiFi
12-21-2023
07:11 AM
1 Kudo
@Rohit1997jio Most NiFi processors that are "started/running" are not continuously consuming threads. The processor configurable scheduling controls how often the component processor requests a thread from the NiFi controller thread pool to execute it's code. By default processors have a run schedule of 0 which means that the processor would get scheduled to run as often as possible. Should execution result in no processing of data (FlowFiles), NiFi controller will delay next scheduled execution of that processor for 1 sec before it can be scheduled again (this logic prevents excessive CPU usage by processor components). The Kafka processors simply execute Kafka Client libraries and support adding dynamic properties to the processors configuration that impact that client library behavior. The Apache NiFi documentation for each component will identify if "Dynamic Properties" are supported: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.24.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html In the case of the consume and publish Kafka component processors, dynamic properties are supported. The dynamic properties are limited to those allowed by the specific kafka client library version: https://kafka.apache.org/documentation.html#configuration If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-12-2023
11:06 PM
disconnect and reconnect again this functionality i want to achieve with Publish kafka processor . ConsumerKafka will consumer continuously .
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
11-29-2023
06:52 AM
@Rohit1997jio You could use the RetryFlowFile processor for this use case. You will feed the "failure" relationship via a connection to the RetryFlowFile processor. The RetryFlowfile processor will continue to route the FlowFile back to PublishKafka using the "retry" relationship until maximum number of retries configured has been exceeded. After max retries has been reached the FlowFile would instead route to the "retries_exceeded" relationship which you can connect to a LogMessage processor. The logMessage processor would then auto-terminate the "success" relationship. The challenge you have here is your requirement to retry once per hour for 24 hours. You could set the penalty duration in the PublishKafka to 1 hour. This means that FlowFile routes to the "failure" relationship would get penalized for 60 mins. The RetryFlowFile would not consume that FlowFile from input connection until penalty duration ended. Then configure your number of retries in the RetryFlowFile processor to 24. Be careful with setting queue size to 250 on the failure connection. If you reach 250 queued on the failure relationship, it will trigger backpressure on the PublishKafka processor meaning the publishKafka processor would not get scheduled again until that backpressure is gone. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
11-28-2023
05:47 AM
@joseomjr Is on to the right solution here. Your regex statement should match "kafka.topic" not "${kafka.topic}". A quick test in regex101.com confirms "kafka\.topic" should match.
... View more