Reply
New Contributor
Posts: 2
Registered: ‎12-20-2017

pyspark - kafka streaming integration - Empty batches

I would like to handle empty batches or files when there is no input from the producer. below code is throwing an error:

'TransformedDStream' object has no attribute 'isEmpty'

So, how do i need to handle empty files?

Consumer code:

import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.context import SQLContext

if __name__ == '__main__':
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>")
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 60)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 2})
    lines = kvs.map(lambda x: x[1])
    if(not lines.isEmpty()):
      lines.saveAsTextFiles("/user/cloudera/")

    ssc.start()
    ssc.awaitTermination()
Announcements