28807
DISCUSSIONS
102218
MEMBERS
3161
ARTICLES
Created 12-20-2017 12:18 PM
I would like to handle empty batches or files when there is no input from the producer. below code is throwing an error:
'TransformedDStream' object has no attribute 'isEmpty'
So, how do i need to handle empty files?
Consumer code:
import sys from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql.context import SQLContext if __name__ == '__main__': if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>") exit(-1) sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 60) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 2}) lines = kvs.map(lambda x: x[1]) if(not lines.isEmpty()): lines.saveAsTextFiles("/user/cloudera/") ssc.start() ssc.awaitTermination()