Support Questions

Find answers, ask questions, and share your expertise

How to avoid writing empty partitions into hdfs from spark?

avatar
New Contributor

I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is its creating empty partitions and not sure how to avoid it., I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is it's creating empty partitions and not sure how to avoid it.

My code:

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } });

lines.print();

lines.dstream().saveAsTextFiles("pathtohdfs");

2 REPLIES 2

avatar
Expert Contributor

Hi, @Apoorva Teja Vanam

I think you can use a more lower level API `foreachRDD` to do that. The following is a sample python code which is modified from the example of http://spark.apache.org/docs/latest/streaming-programming-guide.html . Please compare this the original one.

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def printRecord(record):
    print(record)

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)

    counts.foreachRDD(lambda rdd: rdd.foreach(printRecord))

    ssc.start()
    ssc.awaitTermination()

avatar
Rising Star

You can check for empty patition using following code

lines.dstream().foreachRDD(rdd => {

  if(!rdd.partitions.isEmpty)
rdd.saveAsTextFile(outputDir)
})