Support Questions

Find answers, ask questions, and share your expertise

Structured Streaming writestream append to file

avatar
Contributor

Is it possible to append to a destination file when using writestream in Spark 2.

Example:

I've got a Kafka topic and a stream running and consuming data as it is written to the topic. I want to perform some transformations and append to an existing csv file (this can be local for now, but eventually I'd want this to be on hdfs).

query = csv_select \
 .writeStream \
 .format("csv") \
 .option("format", "append") \	
 .option("path", "/destination_path/") \
 .option("checkpointLocation", "/checkpoint_path") \
 .outputMode("append") \
 .start()

This is the code I'm using and when i produce new data to the topic, it outputs as a new csv file in /destination_path. Any advice on how to append to an existing file rather than create a new one every time?

NOTE: I'm using csv for simplicity here, but if you need to show an example in parquet or some other format that would be helpful too.

EDIT: The example I'm working from is here: https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1...

1 ACCEPTED SOLUTION

avatar

Hi @Sean Byrne

I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced).

However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code:

# Use PySpark to read in all "part" files
allfiles =  spark.read.option("header","false").csv("/destination_path/part-*.csv")

# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")

Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format).

If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.

View solution in original post

1 REPLY 1

avatar

Hi @Sean Byrne

I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced).

However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code:

# Use PySpark to read in all "part" files
allfiles =  spark.read.option("header","false").csv("/destination_path/part-*.csv")

# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")

Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format).

If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.