Support Questions

Find answers, ask questions, and share your expertise

How to do df.rdd or df.collect().foreach on streaming dataset?

avatar
New Contributor

This is the exception I am getting whenever I am trying to convert it.

 

val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

 

All I am trying to do is replicate the following sql.dataframe operations in structured streaming.

 

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

which is running fine in Dataframes but not in structured streaming.

1 REPLY 1

avatar
New Contributor

Structured streaming does not support immediate actions like collect, count, foreach etc. Refer the link below

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio...