How to do df.rdd or df.collect().foreach on streaming dataset?

This is the exception I am getting whenever I am trying to convert it.


val df_col ="ts.user.friends_count")
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;


All I am trying to do is replicate the following sql.dataframe operations in structured streaming.


df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

which is running fine in Dataframes but not in structured streaming.