Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to avoid writing empty partitions into hdfs from spark?

avatar
New Contributor

I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is its creating empty partitions and not sure how to avoid it., I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is it's creating empty partitions and not sure how to avoid it.

My code:

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } });

lines.print();

lines.dstream().saveAsTextFiles("pathtohdfs");

2 REPLIES 2

avatar
Expert Contributor

Hi, @Apoorva Teja Vanam

I think you can use a more lower level API `foreachRDD` to do that. The following is a sample python code which is modified from the example of http://spark.apache.org/docs/latest/streaming-programming-guide.html . Please compare this the original one.

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def printRecord(record):
    print(record)

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)

    counts.foreachRDD(lambda rdd: rdd.foreach(printRecord))

    ssc.start()
    ssc.awaitTermination()

avatar
Rising Star

You can check for empty patition using following code

lines.dstream().foreachRDD(rdd => {

  if(!rdd.partitions.isEmpty)
rdd.saveAsTextFile(outputDir)
})