Reply
New Contributor
Posts: 1
Registered: ‎01-11-2018

How to do df.rdd or df.collect().foreach on streaming dataset?

[ Edited ]

This is the exception I am getting whenever I am trying to convert it.

 

val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

 

All I am trying to do is replicate the following sql.dataframe operations in structured streaming.

 

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

which is running fine in Dataframes but not in structured streaming.

Announcements