I need to consume messages from a Kafka topic which are just lines of values separated by space like "0.00 2.000 10.000 37.000 ...".
So I use the readStream as follows:
stream_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "myhost1:6667,myhost2:6667") \ .option("subscribe", "pruebas") \ .option("startingOffsets", "earliest") \ .load() df = stream_df.selectExpr("CAST(value AS STRING)")
Then I need to manipulate this single column in the dataframe to be able to transform the contents of this column to a dense Vector for a Machine Learning model, so I use the following:
rdd = df.rdd.map(list).map(lambda lista: lista.split())
But I get the following:
>>> rdd = df.rdd.map(list).map(lambda lista: lista.split()) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 84, in rdd jrdd = self._jdf.javaToPython() File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1), stackTrace) pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
How do I do that?
I have tried this but it does not work:
query2 = df.rdd.map(list).map(lambda lista: lista.split()) \ .writeStream \ .outputMode('append') \ .format("console") \ .start()
Any idea on how manipulate the contents of a dataframe's column in a better way under Structured Streaming?
Any help is highly appreciated.
Since it doesn't seem you can easily transform a dataframe into an RDD in Spark's structured streaming, I found a way to manipulate the dataset to fit my needs. I used the split function from the pyspark.sql.functions module to split the contents of the dataframe's column (a string containing the independent variables for my ML model) into several new columns and then I used the VectorAssembler class from pyspark.ml to merge the new columns into a vector column.