Support Questions

Find answers, ask questions, and share your expertise

Storm(1.0.1) HDFSBolt writing multiple 0 byte files when upstream bolt fails all the incoming tuples.

avatar
Contributor

Hi, I am running a Storm sample on HDP 2.5 cluster. It has Storm 1.0.1 installed through Ambari.

The topology is as follows:

Input Messages on Kafka topic --> ( KafkaSpout --> ProcessBolt --> HDFSBolt ) --> Processed data in HDFS

Positive scenario is working fine. Messages published to a Kafka Topic are ingested using in-built KafkaSpout, processed using ProcessBolt, and then stored in HDFS using in-built HDFSBolt.

But in case of negative scenario, I am facing one issue. The scenario is like this:

ProcessBolt execute() function has a try-catch block.

execute()

{

try {

//process the tuple using Algorithm

//call emit() and ack()

} catch {

//throw RuntimeException

}

}

If we force the tuple processing to FAIL by purposely setting some wrong property in the Algorithm (all the tuples will fail in this case, even if data is correct), it will not call emit() and ack(). fail() will automatically get called on timeout, and sent to KafkaSpout, which will indefinitely keep retrying this failed tuple, as per documentation.

This much is fine.

But, when I run this topology, it creates multiple 0 byte files in HDFS, which are increasing in number as time progresses. This should not happen 'ideally', as no emit() method is getting called on ProcessBolt. So, why is HDFSBolt creating these files??

My aim is force HDFSBolt to stop creating these 0 byte files in HDFS, if no data is getting processed in ProcessBolt.

How can this be achieved? Please let me know.

I have also tried sending ACK explicitely in catch block just before throwing RuntimeException, so that KafkaSpout does not retry the same tuple again. But even after doing this change, topology still keeps generating 0 byte HDFS files.

Is there any configuration for HDFS Bolt to avoid this?

My HDFSBolt snippet:

SyncPolicy syncPolicy = new CountSyncPolicy(100);

FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(127, Units.MB);

RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter(",");

FileNameFormat fileNameFormat = new DefaultFileNameFormat().withExtension(".csv") .withPath(hdfsOutputDir);

HdfsBolt hdfsbolt = new HdfsBolt() .withFsUrl(hdfsUrl) .withFileNameFormat(fileNameFormat).withRecordFormat(format) .withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);

My Topology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("kafka-spout", kafkaSpout, 1).setNumTasks(1);

builder.setBolt("process-bolt", processbolt, 1).setNumTasks(1) .shuffleGrouping("kafka-spout");

builder.setBolt("hdfs-bolt", hdfsbolt, 1).setNumTasks(1) .shuffleGrouping("process-bolt");

Please let me know if any other information is needed.

1 ACCEPTED SOLUTION

avatar
Contributor

@prachi bhadekar in your execute() logic instead of throwing a RuntimeException try logging an error message. The problem with throwing an Exception is that it will bring down the worker.

View solution in original post

3 REPLIES 3

avatar
Contributor

@prachi bhadekar in your execute() logic instead of throwing a RuntimeException try logging an error message. The problem with throwing an Exception is that it will bring down the worker.

avatar
Contributor

Thanks @Raghav Kumar Gautam..using collector.reportError() solved this...

avatar
Contributor

@prachi bhadekar Glad that it worked.