Created 03-12-2017 01:33 AM
I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is its creating empty partitions and not sure how to avoid it., I have been streaming data using spark from a kafka topic and writing it to hdfs but the problem is it's creating empty partitions and not sure how to avoid it.
My code:
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } });
lines.print();
lines.dstream().saveAsTextFiles("pathtohdfs");
Created 03-13-2017 06:31 PM
I think you can use a more lower level API `foreachRDD` to do that. The following is a sample python code which is modified from the example of http://spark.apache.org/docs/latest/streaming-programming-guide.html . Please compare this the original one.
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext def printRecord(record): print(record) if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount") ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a+b) counts.foreachRDD(lambda rdd: rdd.foreach(printRecord)) ssc.start() ssc.awaitTermination()
Created 03-23-2017 09:12 PM
You can check for empty patition using following code
lines.dstream().foreachRDD(rdd => {
if(!rdd.partitions.isEmpty)
rdd.saveAsTextFile(outputDir)
})