Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to do df.rdd or df.collect().foreach on streaming dataset?

avatar
New Contributor

This is the exception I am getting whenever I am trying to convert it.

 

val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

 

All I am trying to do is replicate the following sql.dataframe operations in structured streaming.

 

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

which is running fine in Dataframes but not in structured streaming.

1 REPLY 1

avatar
New Contributor

Structured streaming does not support immediate actions like collect, count, foreach etc. Refer the link below

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio...