Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Unable to create RDD with Structured Streaming and Kafka

Highlighted

Unable to create RDD with Structured Streaming and Kafka

Hi,

I need to consume messages from a Kafka topic which are just lines of values separated by space like "0.00 2.000 10.000 37.000 ...".

So I use the readStream as follows:

stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "myhost1:6667,myhost2:6667") \
  .option("subscribe", "pruebas") \
  .option("startingOffsets", "earliest") \
  .load()

df = stream_df.selectExpr("CAST(value AS STRING)")

Then I need to manipulate this single column in the dataframe to be able to transform the contents of this column to a dense Vector for a Machine Learning model, so I use the following:

rdd = df.rdd.map(list).map(lambda lista: lista[0].split())

But I get the following:

>>> rdd = df.rdd.map(list).map(lambda lista: lista[0].split())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 84, in rdd
    jrdd = self._jdf.javaToPython()
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

How do I do that?

I have tried this but it does not work:

query2 = df.rdd.map(list).map(lambda lista: lista[0].split()) \
    .writeStream \
    .outputMode('append') \
    .format("console") \
    .start()

Any idea on how manipulate the contents of a dataframe's column in a better way under Structured Streaming?

Any help is highly appreciated.

1 REPLY 1

Re: Unable to create RDD with Structured Streaming and Kafka

Since it doesn't seem you can easily transform a dataframe into an RDD in Spark's structured streaming, I found a way to manipulate the dataset to fit my needs. I used the split function from the pyspark.sql.functions module to split the contents of the dataframe's column (a string containing the independent variables for my ML model) into several new columns and then I used the VectorAssembler class from pyspark.ml to merge the new columns into a vector column.

Don't have an account?
Coming from Hortonworks? Activate your account here