Member since
11-03-2023
26
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1481 | 11-08-2023 09:29 AM |
08-27-2024
05:52 AM
Hi , In My nifi flow i have two dataFlows i want to same same data to two different HTTP endPoints , for both data flow data is being consumed from same kaflka topic (TOPIC A) both flows has different kafka groupID and pushing data to different HPPT end points (endPoint 1 and endPoint 2). If there is any failure data is being produced into retryTopic R which is common for both data flow . There is separate retryData flow for both configured with seperate HttpEndPoints , but both consuming from same retryTopic . What issue coming is , due to same retry topic , dataFlow A can consume data of dataFlowB . How should i avoid this situation ? retryTopic used should be same for both. Here is a sample for the dataFlow .
... View more
Labels:
- Labels:
-
Apache NiFi
07-17-2024
07:22 AM
Did you find any solution about this issue ? I have exactly the same problem. An invoke HTTP get a time out on the flow file. A retry is implemented and the flow file get stucked with a timeout each time. Works fine with same configuration to call API with postman and a curl from server nifi node. The only way is to restart the invoke http processor and after few hours and some flow files a new occurence appears. Error : 2024-07-17 14:55:06,783 ERROR [Timer-Driven Process Thread-17] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=ba9d2024-8df1-323d-b0ba-54402272b188] Routing to Failure due to exception: java.net.SocketTimeoutException: timeout: java.net.SocketTimeoutException: timeout java.net.SocketTimeoutException: timeout at okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593) at okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601) at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.java:510) at okhttp3.internal.http2.Http2Stream$FramingSink.close(Http2Stream.java:538) at okio.RealBufferedSink.close(RealBufferedSink.java:236) at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:63) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at okhttp3.RealCall.execute(RealCall.java:69) at org.apache.nifi.processors.standard.InvokeHTTP.onTrigger(InvokeHTTP.java:850) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks
... View more
12-28-2023
05:23 AM
Looking at the thread, Ans is No, that's not the way NiFi Processor is meant to handle relationships, The Client Processor has to initialize /start properly with the given configuration then only it can try to write and then based on the write results flow files can be routed to failure or success, Here Processor can't even start properly.
... View more
12-22-2023
09:52 AM
@Rohit1997jio, I was going to answer this similar to @MattWho who beat me to it but I will post my answer anyway in case it can help. Basically @MattWho is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code. I'm using groovy code for my script. The process relies on two attributes: 1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0. 2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry. The groovy script: flowFile = session.get()
if(!flowFile) return
// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')
// if its the first time set the value to 0 (no rety yet. first penalize)
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()
// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs
if(totalRetry.toInteger()>3)
{
session.transfer(flowFile, REL_FAILURE)
return
}
// if totalRetry has not passed the threshold and the file is not
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
flowFile = session.penalize(flowFile)
session.transfer(flowFile)
return
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return You can set the Penalize period under processor SETTINGS tab Hope this helps Thanks
... View more
12-21-2023
11:53 AM
I want to retry my flow file if it has some error for 24 hours , retry should will happen every hours . if the file is there in the flow for more then 24 hours it should be removed .
iam trying achieve same by using update attribute processors , routeOnAttribute processor and taking help of expression language . Still iam not able to achieve it.
these are the expression I am using to check elapsed time and received Time of flow file in routeOnAttribute processor but its going to unmatched relation every time -
these values are for testing pursose -
elapsed_time = ${now():toNumber():minus(${queuedtimestamp:toNumber()}):divide(60000):gt(10)}
receivedTime= ${now():toNumber():minus(${receivedTime:toDate("yyyy-MM-dd HH:mm:ss"):toNumber()}):gt(180000)}
NOTE: retryFlowFile processor in not available as my nifi version is old
Please suggest better approach to do so .
... View more
Labels:
- Labels:
-
Apache NiFi
12-21-2023
07:11 AM
1 Kudo
@Rohit1997jio Most NiFi processors that are "started/running" are not continuously consuming threads. The processor configurable scheduling controls how often the component processor requests a thread from the NiFi controller thread pool to execute it's code. By default processors have a run schedule of 0 which means that the processor would get scheduled to run as often as possible. Should execution result in no processing of data (FlowFiles), NiFi controller will delay next scheduled execution of that processor for 1 sec before it can be scheduled again (this logic prevents excessive CPU usage by processor components). The Kafka processors simply execute Kafka Client libraries and support adding dynamic properties to the processors configuration that impact that client library behavior. The Apache NiFi documentation for each component will identify if "Dynamic Properties" are supported: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.24.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html In the case of the consume and publish Kafka component processors, dynamic properties are supported. The dynamic properties are limited to those allowed by the specific kafka client library version: https://kafka.apache.org/documentation.html#configuration If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-12-2023
11:06 PM
disconnect and reconnect again this functionality i want to achieve with Publish kafka processor . ConsumerKafka will consumer continuously .
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
11-29-2023
06:52 AM
@Rohit1997jio You could use the RetryFlowFile processor for this use case. You will feed the "failure" relationship via a connection to the RetryFlowFile processor. The RetryFlowfile processor will continue to route the FlowFile back to PublishKafka using the "retry" relationship until maximum number of retries configured has been exceeded. After max retries has been reached the FlowFile would instead route to the "retries_exceeded" relationship which you can connect to a LogMessage processor. The logMessage processor would then auto-terminate the "success" relationship. The challenge you have here is your requirement to retry once per hour for 24 hours. You could set the penalty duration in the PublishKafka to 1 hour. This means that FlowFile routes to the "failure" relationship would get penalized for 60 mins. The RetryFlowFile would not consume that FlowFile from input connection until penalty duration ended. Then configure your number of retries in the RetryFlowFile processor to 24. Be careful with setting queue size to 250 on the failure connection. If you reach 250 queued on the failure relationship, it will trigger backpressure on the PublishKafka processor meaning the publishKafka processor would not get scheduled again until that backpressure is gone. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
11-28-2023
05:47 AM
@joseomjr Is on to the right solution here. Your regex statement should match "kafka.topic" not "${kafka.topic}". A quick test in regex101.com confirms "kafka\.topic" should match.
... View more