Created 11-20-2023 12:10 AM
Hi ,
In Apache NIFI I am consuming data from one Kafka topic and publishing that data to customers Kafka topic . For this i am using ConsumeKafka and PublishKafka processor respectively.
If outgoing dataflow is idle for more than 5 minutes, the Kafka connection with customer should be terminated and reconnected when there is any new messages consumed .
how to do this with publish kafka processor ?? how to terminate and start the connection??
Created 11-22-2023 12:10 AM
@Rohit1997jio,
As @joseomjr already pointed out, by doing this, you defeat the single purpose of using Kafka. As you already know, Kafka is a stream-processing platform and basically function in a very basic way as a queue of messages.
Integrating Kafka with NiFi, especially using the processor ConsumeKafka, you basically create a bucket at the end of the queue. As long as messages are present in the queue (Kafka in your case), you will have messages arriving in your bucket (you NiFi processing layer in your case). When you do not have any messages in the kafka system, your ConsumeKafka processor will be in a let's call it idle state, meaning that it will not waste resources in vain - it will however use some resources to check whether new messages arrived or not.
That being said, I see no point in trying to kill a connection which is NOT affecting the involved systems in any way and basically defeats the entire purpose of using NiFi and Kafka.
However, if you still want to achieve this, you will need to put some extra effort in doing this. First of all, you need to create a flow which checks the state of the desired processor using the NiFi's REST API - achievable in many ways, like InvokeHTTP, ExecuteStreamCommand, etc. If nothing has been done in the past 5 minutes (displayed in the JSON received as response from REST API) you will activate an InvokeHTTP in which you call again the REST API for STOPPING the ConsumeKafka Processor.
Created 11-21-2023 01:50 PM
Why do you need to stop/terminate the connection? Sort of defeats how Kafka is meant to function.
Created 11-22-2023 12:10 AM
@Rohit1997jio,
As @joseomjr already pointed out, by doing this, you defeat the single purpose of using Kafka. As you already know, Kafka is a stream-processing platform and basically function in a very basic way as a queue of messages.
Integrating Kafka with NiFi, especially using the processor ConsumeKafka, you basically create a bucket at the end of the queue. As long as messages are present in the queue (Kafka in your case), you will have messages arriving in your bucket (you NiFi processing layer in your case). When you do not have any messages in the kafka system, your ConsumeKafka processor will be in a let's call it idle state, meaning that it will not waste resources in vain - it will however use some resources to check whether new messages arrived or not.
That being said, I see no point in trying to kill a connection which is NOT affecting the involved systems in any way and basically defeats the entire purpose of using NiFi and Kafka.
However, if you still want to achieve this, you will need to put some extra effort in doing this. First of all, you need to create a flow which checks the state of the desired processor using the NiFi's REST API - achievable in many ways, like InvokeHTTP, ExecuteStreamCommand, etc. If nothing has been done in the past 5 minutes (displayed in the JSON received as response from REST API) you will activate an InvokeHTTP in which you call again the REST API for STOPPING the ConsumeKafka Processor.
Created 12-12-2023 11:04 PM
consuming from nifi is fine that i want to be continuous , i want to terminate connection for publish kafka processor , when there is no new message consumed by consumeKafka processor for more then 5 minutes , i want to terminate connection with publish kafka as my consumer kafka and producer kafka are different .
as soon as my consumer kafka consumer some message , i want connection to be established again with producer kafka
Created 12-12-2023 11:06 PM
disconnect and reconnect again this functionality i want to achieve with Publish kafka processor .
ConsumerKafka will consumer continuously .