Hi,
I need to consume messages from a Kafka topic which are just lines of values separated by space like "0.00 2.000 10.000 37.000 ...".
So I use the readStream as follows:
stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "myhost1:6667,myhost2:6667") \
.option("subscribe", "pruebas") \
.option("startingOffsets", "earliest") \
.load()
df = stream_df.selectExpr("CAST(value AS STRING)")
Then I need to manipulate this single column in the dataframe to be able to transform the contents of this column to a dense Vector for a Machine Learning model, so I use the following:
rdd = df.rdd.map(list).map(lambda lista: lista[0].split())
But I get the following:
>>> rdd = df.rdd.map(list).map(lambda lista: lista[0].split())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 84, in rdd
jrdd = self._jdf.javaToPython()
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
How do I do that?
I have tried this but it does not work:
query2 = df.rdd.map(list).map(lambda lista: lista[0].split()) \
.writeStream \
.outputMode('append') \
.format("console") \
.start()
Any idea on how manipulate the contents of a dataframe's column in a better way under Structured Streaming?
Any help is highly appreciated.