Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to clean up / purge Kafka queues?

avatar

What is the best way to clean / purge the messages in a Kafka queue (irrespective of whether they have been read or not)?

Scenario is that in a dev environment a dummy publisher is pushing messages to a Kafka queue from which a storm topology is reading the messages. If a change has to be made in the publisher such that the current messages in the queue (read or not) are all invalid, so we need to purge those messages and start again.

1 ACCEPTED SOLUTION

avatar

In HDP 2.3 you can delete the topic via a standard admin command:

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic someTopic

For this to work, one has to make sure the delete.topic.enable flag is set to true (check it via Ambari).

Once deleted, any new message posted to the topic will re-create it, assuming the auto.create.topics.enable property is set to true (it might be disabled in production, same as delete one above). If that's the case, then the admin must create the topic again:

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic someTopic --partitions 10 --replication-factor 3 

View solution in original post

8 REPLIES 8

avatar

For the .8 we are using following

Shutdown Kafka Broker

rm -rf /data/kafka-logs/<topic/Partition_name>

and restart.

avatar

Wouldn't that cause all kinds of issues with zookeeper state? Besides, the data replicas will be on multiple nodes.

avatar
Master Mentor

the topic will still exist until you push new messages to this topic and then follow @AndrewGrande answer below

avatar

In HDP 2.3 you can delete the topic via a standard admin command:

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic someTopic

For this to work, one has to make sure the delete.topic.enable flag is set to true (check it via Ambari).

Once deleted, any new message posted to the topic will re-create it, assuming the auto.create.topics.enable property is set to true (it might be disabled in production, same as delete one above). If that's the case, then the admin must create the topic again:

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic someTopic --partitions 10 --replication-factor 3 

avatar
Rising Star

I had better luck expunging the message by changing the retention time and then resetting back.

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

avatar

Changing a retention period on the topic only marks data segments for deletion. The actual thread which performs an action kicks in every 5 minutes (the default for log.retention.check.interval.ms). Not sure the OP wants to wait up to 5 minutes to delete data.

avatar
New Contributor

@Andrew Grande - When I try to set the log.retention.check.interval configuration setting, I receive an error message stating the following:

Error while executing topic command : Unknown topic config name: log.retention.check.interval.ms

Has this configuration setting been deprecated?

This is the command that I run:

kafka-topics --zookeeper localhost:2181 --alter --entity-type topics --entity-name myTopic --add-config log.retention.check.interval.ms=5000

avatar
New Contributor

Great info, I am really struggling to understand what happens while consumer reading the messages and rentention time expires for log segment?