Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

"TypeError: 'TransformedDStream' object is not iterable" while running pyspark using spark-submit

"TypeError: 'TransformedDStream' object is not iterable" while running pyspark using spark-submit

Explorer

Hello,

I am having a issue while running a Spark Streaming Application.

When run the code using this command.

SparkStreaming.py is my python code.

$spark-submit --jars /home/UDHAV.MAHATA/Downloads/spark-streaming-kafka-0-8-assembly_2.11-2.4.5.jar /home/UDHAV.MAHATA/Documents/SparkStreaming/SparkStreaming.py

Then I am getting this error.

TypeError Traceback (most recent call last)
~/Documents/SparkStreaming/SparkStreaming.py in <module>
29 lines = KafkaUtils.createStream(ssc,"localhost:2181", "spark-streaming-consumer", {'kafkaPublish':1})
30 movies = lines.map(parseInput)
---> 31 movieDataset = spark.createDataFrame(movies).toDF().take(10)
32 averageRatings = movieDataset.groupBy("movieID").avg("rating")
33 counts = movieDataset.groupBy("movieID").count()

/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
746 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
747 else:
--> 748 rdd, schema = self._createFromLocal(map(prepare, data), schema)
749 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
750 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

TypeError: 'TransformedDStream' object is not iterable

 And this is my Spark Application code

SparkStreaming.py

from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql import functions

def loadMovieNames():

movieNames = {}

with open("/home/UDHAV.MAHATA/Documents/SparkStreaming/u.item", encoding = "ISO-8859-1") as f:

for line in f:

fields = line.split('|')

movieNames[int(fields[0])] = fields[1]

return movieNames

def parseInput(line):

fields = line.split()

return Row(movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__":

sc = SparkContext(appName="PythonStreamingKafkaWordCount")

ssc = StreamingContext(sc, 10)

spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

movieNames = loadMovieNames()

lines = KafkaUtils.createStream(ssc,"localhost:2181", "spark-streaming-consumer", {'kafkaPublish':1})

movies = lines.map(parseInput)

movieDataset = spark.createDataFrame(movies)

averageRatings = movieDataset.groupBy("movieID").avg("rating")

counts = movieDataset.groupBy("movieID").count()

averagesAndCounts = counts.join(averageRatings, "movieID")

topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

for movie in topTen:

print (movieNames[movie[0]], movie[1], movie[2])

ssc.start()

ssc.awaitTermination()

Can someone help me solve this issue?

Thank you in advance.

Don't have an account?
Coming from Hortonworks? Activate your account here