Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Challenges with Storm

Challenges with Storm

On my HDP 2.6.1 sandbox, I have created a Kafka Topic (Named Clean_Orders). Through NiFi, I am pumping data into this Kafka Topic. And I am trying to read the messages from this Kafka Topic using Storm Topology. The code is written to read from Kafka 0.10 Version according to Storm-Kafka Client.

Here's my Storm Topology:

public class ReadFromKafka01 {
 private static final String KAFKA_CONF_BOOTSTRAP_SERVERS = "sandbox.hortonworks.com:6667";
 private static final String[] KAFKA_CONF_TOPICS = { "Clean_Orders" };
 private static final String KAFKA_CONF_CONSUMER_GROUP_ID = "Clean_Order_Consumer_Group";
 private static final String[] ORDER_FIELDS =  {"OrderID", "Firm", "OrderSystem", 
   "TransType", "OrderRefNum", "RecordSeqNum",
   "LoadDateTime", "LoadDate", "CreateDateTime", 
   "CreateDate", "OMSDateTime", "OMSDate",
   "OrderDateTime", "OrderDate", "Trader", 
   "Symbol", "Side", "SideStatus", 
   "Quantity", "LimitPrice", "OrderType"};

 private static Func<ConsumerRecord<String, String>, List<Object>> DEFAULT_CUSTOM_TRANSLATOR = new Func<ConsumerRecord<String, String>, List<Object>>() {

  private static final long serialVersionUID = 2253191049162512317L;
  public List<Object> apply(ConsumerRecord<String, String> consumerRecord) {
   try {
    System.out.println(String.format(">>> Kafka Message -- Key = [%s], Value = [%s]", consumerRecord.key(),
      consumerRecord.value()));
    String records[] = consumerRecord.value().split(Pattern.quote("|"));
    return new ArrayList<Object>(Arrays.asList(records));
   } catch (Exception e) {
    System.err.println("Failed to Parse " + consumerRecord.value());
    System.err.println(e.getMessage());
    e.printStackTrace();
   }
   return null;
  }
 };

 public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  String bootStrapServers = KAFKA_CONF_BOOTSTRAP_SERVERS;
  if (args != null && args.length > 1 && (null != args[1] && !"".equals(args[1].trim()))) {
   bootStrapServers = args[1];
  }
  KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
    KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
    KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE,
    KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));

  KafkaSpoutConfig<String, String> spoutConf = KafkaSpoutConfig
    .builder(bootStrapServers, KAFKA_CONF_TOPICS)
    .setGroupId(KAFKA_CONF_CONSUMER_GROUP_ID)
    .setOffsetCommitPeriodMs(10000)
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
    .setMaxUncommittedOffsets(1000000)
    .setRetry(kafkaSpoutRetryService)
    .setRecordTranslator(DEFAULT_CUSTOM_TRANSLATOR,
      new Fields(ORDER_FIELDS))
    .build();

  builder.setSpout("kafka_spout", new KafkaSpout<String, String>(spoutConf), 1);
  builder.setBolt("printer_bolt", new PrinterBolt(), 4).shuffleGrouping("kafka_spout");

  Config conf = new Config();
  conf.setDebug(true);

  if (args != null && args.length > 0) {
   conf.setNumWorkers(3);
   StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  } else {
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology("test", conf, builder.createTopology());
   Utils.sleep(10000);
   cluster.killTopology("test");
   cluster.shutdown();
  }
 }
}

And here's the simple PrinterBolt

public class PrinterBolt extends BaseBasicBolt {
 private static final long serialVersionUID = -7395574954624796650L;
 public void execute(Tuple tuple, BasicOutputCollector collector) {
  System.out.println("From Printer Bolt >> " + tuple);
 }
 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 }
}

I use following command to submit the topology to my HDP 2.6.1 Sandbox.

storm jar storm-poc-1.0.0.jar com.abc.def.ghi.ReadFromKafka01 T16

I am running into a few problems with this topology.

1. Topology is successfully submitted but I see some exception. Please click here for details.

2. On Storm UI, I see following error message under my topology. See attachment KafkaSpoutOffsetLogError.png

Unable to get offset lags for kafka. Reason: org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '[Clean_Orders]' is invalid

40801-kafkaspoutoffsetlogerror.png

3. I am pumping in just one record to Nifi and it shows 0 Tuples Emitted by Spout and 20 Tuples Executed by Bolt. Please see attached image Spout_Bolt_RecordCount.png.

40802-spout-bolt-recordcount.png

Don't have an account?
Coming from Hortonworks? Activate your account here