Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Structured Streaming writestream append to file

Solved Go to solution
Highlighted

Structured Streaming writestream append to file

Explorer

Is it possible to append to a destination file when using writestream in Spark 2.

Example:

I've got a Kafka topic and a stream running and consuming data as it is written to the topic. I want to perform some transformations and append to an existing csv file (this can be local for now, but eventually I'd want this to be on hdfs).

query = csv_select \
 .writeStream \
 .format("csv") \
 .option("format", "append") \	
 .option("path", "/destination_path/") \
 .option("checkpointLocation", "/checkpoint_path") \
 .outputMode("append") \
 .start()

This is the code I'm using and when i produce new data to the topic, it outputs as a new csv file in /destination_path. Any advice on how to append to an existing file rather than create a new one every time?

NOTE: I'm using csv for simplicity here, but if you need to show an example in parquet or some other format that would be helpful too.

EDIT: The example I'm working from is here: https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1...

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Structured Streaming writestream append to file

Hi @Sean Byrne

I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced).

However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code:

# Use PySpark to read in all "part" files
allfiles =  spark.read.option("header","false").csv("/destination_path/part-*.csv")

# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")

Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format).

If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.

View solution in original post

1 REPLY 1

Re: Structured Streaming writestream append to file

Hi @Sean Byrne

I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced).

However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code:

# Use PySpark to read in all "part" files
allfiles =  spark.read.option("header","false").csv("/destination_path/part-*.csv")

# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")

Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format).

If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.

View solution in original post

Don't have an account?
Coming from Hortonworks? Activate your account here