Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar

Storm 1.1.X provide an external storm kafka client that we could use to build storm topology. Please note this is support for Kafka 0.10 onwards. Below is the step by step guide on how to use the API's.

  • Add below dependency to your pom.xml
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.1.1-SNAPSHOT</version>
</dependency>
  • The kafka spout implementation for the topology is configured using KafkaSpoutConfig. Below is a sample config object creation.
KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder(bootStrapServers, topic)
        .setGroupId(consumerGroupId)
        .setOffsetCommitPeriodMs(10_000)
        .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
        .setMaxUncommittedOffsets(1000000)
        .setRetry(kafkaSpoutRetryService)
        .setRecordTranslator
                (new TupleBuilder(), outputFields, topic )
        .build();

Above class follows builder pattern. bootStrapServers is the Kafka broker end point from where the consumer records are to be polled. topic is the kafka topic name. It can be a collection of kafka topic ( multiple topic or a Pattern ( regular expression ) as well. consumerGroupId would set the kafka consumer group id ( group.id).

setFirstPollOffsetStrategy allows you to set from where the consumer records should be fetched. This takes an enum as input and below is the description for the same.

EARLIEST - spout will fetch the first offset of the partition, irrespective of commit
LATEST - spout will fetch records greater than the last offset in partition, irrespective of commit.
UNCOMMITTED_EARLIEST - spout will fetch the first offset of the parition, if there is no commit
UNCOMMITTED_LATEST - spout will fetch records from the last offset, if there is no commit.

kafkaSpoutRetryService impl is provided below. This is making use of ExponentialBackOff. This setRetry provides a pluggable interface if in case you would want to have failed tuples retry differently.

KafkaSpoutRetryService kafkaSpoutRetryService =  new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));

setRecordTranslator provides a mechanism through which we can specify how the kafka consumer records should be converted to tuples. In the above given e.x the TupleBuilder is implementing Func interface. Below is the sample impl of apply method that needs to be overridden. OutputFields is the list of the fields that will be emitted in tuple. Please note there are multiple ways to set translate records to tuple. Please go through storm kafka client documentation for more details.

public List<Object> apply(ConsumerRecord<String, String> consumerRecord) {
    try {
        String records[] = consumerRecord.value().split('|')
        return Arrays.asList(records);
    } catch (Exception e) {
        LOGGER.debug("Failed to Parse {}. Throwing Exception {}", consumerRecord.value() , e.getMessage() );
        e.printStackTrace();
    }
    return null;
}
  • Once the above step is complete, topology can include above created spoutConf as below.
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(1);
builder.setSpout(KAFKA_SPOUT,  new KafkaSpout(spoutConf), 1);

Reference: https://github.com/apache/storm/blob/1.x-branch/docs/storm-kafka-client.md

10,102 Views