Support Questions
Find answers, ask questions, and share your expertise

HDP 2.5 upgrade with Storm 1.0.x - My Storm topology is not downloading any messages from kafka ....

Rising Star

I have written a bare minimum storm topology, to read data from kafka and directly dump into hdfs!

Storm topology doesn't show any exceptions on storm ui,But it is not downloading any messages also from Kafka..! cannot debug also! [I am not sure where the logs will be saved. etc. any help pls ]

I have set storm kafka spout to setting "OffsetRequest.EarliestTime();" to ensure download messages from the start...! I know there are messages in the topic, but my storm topology is not downloading anything!

Can someone pls share your thoughts to help me fix this ...

Also, few details, i have built a storm-kafka module seperately, as i was informed, that storm in hdp2.5 will only support hwx kafka and kerberos secured one... ! But our kafka environment is not both! hence took apache source code and compiled it!

Please help me fix this issue. Thanks a tonnn!!!

Attached the pom.xml's i used to build this.

Master pomfile: masterpom.xml

pom file used to build storm-kafka module: storm-kafkapom.xml

pom file used to actual topology: storm-topology-actual-sourcepom.xml

Find below the source code:

import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * Created by z013sqm on 3/17/17.
 */
public class KafkaToHdp {


    public static void main(String[]args) throws IOException, InvalidTopologyException,
            AlreadyAliveException, AuthorizationException {








        //Building Config Maps for external system integrations


        String hdfsConfigKey = "hdfs.config";


        Map<String, Object> hdfsConfigMap = new HashMap<String, Object>();
        hdfsConfigMap.put("hdfs.keytab.file", "keytab path");
        hdfsConfigMap.put("hdfs.kerberos.principal", "keytab princ");




        Config configured = new Config();


        configured.setDebug(true);


        configured.put(hdfsConfigKey, hdfsConfigMap);


        configured.setNumWorkers(1);
        configured.setMaxSpoutPending(1000);
        configured.setNumAckers(1);
        configured.setMessageTimeoutSecs(600);




        BrokerHosts hosts = new ZkHosts("xxxx");
        SpoutConfig spoutConfig = new SpoutConfig(hosts, "xxxx", "/storm" + "/xxxx", UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.startOffsetTime = OffsetRequest.EarliestTime();


        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


        //Settings to setup HDFS Write Bolt
        FileNameFormat fileNameWithPath = new DefaultFileNameFormat().withPath("xxxx");
        RecordFormat recFormat = new DelimitedRecordFormat().withFieldDelimiter("^^^^^");
        SyncPolicy syncPolicy = new CountSyncPolicy(5);
        FileRotationPolicy fileRotatePolicy = new FileSizeRotationPolicy(256.0f, FileSizeRotationPolicy.Units.MB);


        HdfsBolt hdfsbolt = new HdfsBolt()
                .withFsUrl("xxxxx")
                .withRecordFormat(recFormat)
                .withFileNameFormat(fileNameWithPath)
                .withRotationPolicy(fileRotatePolicy)
                .withSyncPolicy(syncPolicy)
                .withConfigKey(hdfsConfigKey);


        TopologyBuilder builder = new TopologyBuilder();


        builder.setSpout("kafka-spout", kafkaSpout, 2);


        builder.setBolt("write-to-hdfs", hdfsbolt).shuffleGrouping("kafka-spout");


        StormTopology topology = builder.createTopology();
        StormSubmitter.submitTopology("testing-storm-hdp-2p5_test", configured, topology);


    }


}
1 REPLY 1

Contributor

Hi Raja, You can go to your topology page in storm ui. From there you can click on the spout and once you get to the component page you should see a worker node assigned to it. On that worker node you can check /var/log/storm or the log directory configured to look at the worker logs that should give an idea of whats going on in the spout. You can attach the worker log here and we can try to figure out whats the issue.