Challenges with Storm

On my HDP 2.6.1 sandbox, I have created a Kafka Topic (Named Clean_Orders). Through NiFi, I am pumping data into this Kafka Topic. And I am trying to read the messages from this Kafka Topic using Storm Topology. The code is written to read from Kafka 0.10 Version according to Storm-Kafka Client.

Here's my Storm Topology:

public class ReadFromKafka01 {
 private static final String KAFKA_CONF_BOOTSTRAP_SERVERS = "";
 private static final String[] KAFKA_CONF_TOPICS = { "Clean_Orders" };
 private static final String KAFKA_CONF_CONSUMER_GROUP_ID = "Clean_Order_Consumer_Group";
 private static final String[] ORDER_FIELDS =  {"OrderID", "Firm", "OrderSystem", 
   "TransType", "OrderRefNum", "RecordSeqNum",
   "LoadDateTime", "LoadDate", "CreateDateTime", 
   "CreateDate", "OMSDateTime", "OMSDate",
   "OrderDateTime", "OrderDate", "Trader", 
   "Symbol", "Side", "SideStatus", 
   "Quantity", "LimitPrice", "OrderType"};

 private static Func<ConsumerRecord<String, String>, List<Object>> DEFAULT_CUSTOM_TRANSLATOR = new Func<ConsumerRecord<String, String>, List<Object>>() {

  private static final long serialVersionUID = 2253191049162512317L;
  public List<Object> apply(ConsumerRecord<String, String> consumerRecord) {
   try {
    System.out.println(String.format(">>> Kafka Message -- Key = [%s], Value = [%s]", consumerRecord.key(),
    String records[] = consumerRecord.value().split(Pattern.quote("|"));
    return new ArrayList<Object>(Arrays.asList(records));
   } catch (Exception e) {
    System.err.println("Failed to Parse " + consumerRecord.value());
   return null;

 public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  String bootStrapServers = KAFKA_CONF_BOOTSTRAP_SERVERS;
  if (args != null && args.length > 1 && (null != args[1] && !"".equals(args[1].trim()))) {
   bootStrapServers = args[1];
  KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
    KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE,

  KafkaSpoutConfig<String, String> spoutConf = KafkaSpoutConfig
    .builder(bootStrapServers, KAFKA_CONF_TOPICS)
      new Fields(ORDER_FIELDS))

  builder.setSpout("kafka_spout", new KafkaSpout<String, String>(spoutConf), 1);
  builder.setBolt("printer_bolt", new PrinterBolt(), 4).shuffleGrouping("kafka_spout");

  Config conf = new Config();

  if (args != null && args.length > 0) {
   StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  } else {
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology("test", conf, builder.createTopology());

And here's the simple PrinterBolt

public class PrinterBolt extends BaseBasicBolt {
 private static final long serialVersionUID = -7395574954624796650L;
 public void execute(Tuple tuple, BasicOutputCollector collector) {
  System.out.println("From Printer Bolt >> " + tuple);
 public void declareOutputFields(OutputFieldsDeclarer ofd) {

I use following command to submit the topology to my HDP 2.6.1 Sandbox.

storm jar storm-poc-1.0.0.jar T16

I am running into a few problems with this topology.

1. Topology is successfully submitted but I see some exception. Please click here for details.

2. On Storm UI, I see following error message under my topology. See attachment KafkaSpoutOffsetLogError.png

Unable to get offset lags for kafka. Reason: org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '[Clean_Orders]' is invalid


3. I am pumping in just one record to Nifi and it shows 0 Tuples Emitted by Spout and 20 Tuples Executed by Bolt. Please see attached image Spout_Bolt_RecordCount.png.


