Support Questions

Find answers, ask questions, and share your expertise

terminate kafka connection if publish kafka processor is idle for 5 min and start when data received

avatar
Contributor

Hi ,

In Apache NIFI  I am  consuming data from one Kafka topic and publishing that data to customers Kafka topic . For this i am using ConsumeKafka and PublishKafka processor respectively.

If outgoing dataflow is idle for more than 5 minutes, the Kafka connection with customer should be terminated and  reconnected when there is  any new  messages consumed .

how to do this with publish kafka processor ?? how to terminate and start the connection??

1 ACCEPTED SOLUTION

avatar

@Rohit1997jio,

As @joseomjr already pointed out, by doing this, you defeat the single purpose of using Kafka. As you already know, Kafka is a stream-processing platform and basically function in a very basic way as a queue of messages.

Integrating Kafka with NiFi, especially using the processor ConsumeKafka, you basically create a bucket at the end of the queue. As long as messages are present in the queue (Kafka in your case), you will have messages arriving in your bucket (you NiFi processing layer in your case). When you do not have any messages in the kafka system, your ConsumeKafka processor will be in a let's call it idle state, meaning that it will not waste resources in vain - it will however use some resources to check whether new messages arrived or not.

That being said, I see no point in trying to kill a connection which is NOT affecting the involved systems in any way and basically defeats the entire purpose of using NiFi and Kafka.

However, if you still want to achieve this, you will need to put some extra effort in doing this. First of all, you need to create a flow which checks the state of the desired processor using the NiFi's REST API - achievable in many ways, like InvokeHTTP, ExecuteStreamCommand, etc. If nothing has been done in the past 5 minutes (displayed in the JSON received as response from REST API) you will activate an InvokeHTTP in which you call again the REST API for STOPPING the ConsumeKafka Processor.

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

Why do you need to stop/terminate the connection? Sort of defeats how Kafka is meant to function.

avatar

@Rohit1997jio,

As @joseomjr already pointed out, by doing this, you defeat the single purpose of using Kafka. As you already know, Kafka is a stream-processing platform and basically function in a very basic way as a queue of messages.

Integrating Kafka with NiFi, especially using the processor ConsumeKafka, you basically create a bucket at the end of the queue. As long as messages are present in the queue (Kafka in your case), you will have messages arriving in your bucket (you NiFi processing layer in your case). When you do not have any messages in the kafka system, your ConsumeKafka processor will be in a let's call it idle state, meaning that it will not waste resources in vain - it will however use some resources to check whether new messages arrived or not.

That being said, I see no point in trying to kill a connection which is NOT affecting the involved systems in any way and basically defeats the entire purpose of using NiFi and Kafka.

However, if you still want to achieve this, you will need to put some extra effort in doing this. First of all, you need to create a flow which checks the state of the desired processor using the NiFi's REST API - achievable in many ways, like InvokeHTTP, ExecuteStreamCommand, etc. If nothing has been done in the past 5 minutes (displayed in the JSON received as response from REST API) you will activate an InvokeHTTP in which you call again the REST API for STOPPING the ConsumeKafka Processor.

avatar
Contributor

consuming from nifi is fine that i want to be continuous , i want to terminate connection for publish kafka  processor , when there is no new message consumed by consumeKafka processor for more then 5 minutes , i want to terminate connection with publish kafka as my consumer kafka and producer kafka are different .

as soon as my consumer kafka consumer some message , i want connection to be established again with producer kafka

avatar
Contributor

disconnect and reconnect again this functionality i want to achieve with Publish kafka processor .

ConsumerKafka will consumer continuously .