Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Unable to create RDD with Structured Streaming and Kafka

Hi,

I need to consume messages from a Kafka topic which are just lines of values separated by space like "0.00 2.000 10.000 37.000 ...".

So I use the readStream as follows:

stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "myhost1:6667,myhost2:6667") \
  .option("subscribe", "pruebas") \
  .option("startingOffsets", "earliest") \
  .load()

df = stream_df.selectExpr("CAST(value AS STRING)")

Then I need to manipulate this single column in the dataframe to be able to transform the contents of this column to a dense Vector for a Machine Learning model, so I use the following:

rdd = df.rdd.map(list).map(lambda lista: lista[0].split())

But I get the following:

>>> rdd = df.rdd.map(list).map(lambda lista: lista[0].split())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 84, in rdd
    jrdd = self._jdf.javaToPython()
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

How do I do that?

I have tried this but it does not work:

query2 = df.rdd.map(list).map(lambda lista: lista[0].split()) \
    .writeStream \
    .outputMode('append') \
    .format("console") \
    .start()

Any idea on how manipulate the contents of a dataframe's column in a better way under Structured Streaming?

Any help is highly appreciated.

1 REPLY 1

Since it doesn't seem you can easily transform a dataframe into an RDD in Spark's structured streaming, I found a way to manipulate the dataset to fit my needs. I used the split function from the pyspark.sql.functions module to split the contents of the dataframe's column (a string containing the independent variables for my ML model) into several new columns and then I used the VectorAssembler class from pyspark.ml to merge the new columns into a vector column.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.