Support Questions
Find answers, ask questions, and share your expertise

Apache spark structured streaming generates too many parquet files on HDFS.

Highlighted

Apache spark structured streaming generates too many parquet files on HDFS.

Explorer

Hi 

I am using SPARK  structured streaming to read text log files from s3 bucket and store it in parquet format on HDFS location.

It noticed that this job generates too many files.

 

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, TimestampType};
import java.util.Calendar
import org.apache.spark.sql.SparkSession
import sys.process._

val checkPointDir = "/tmp/rt/checkpoint/"

val spark = SparkSession.builder.config("fs.s3a.awsAccessKeyId","zzz").config("fs.s3a.awsSecretAccessKey","aaabbb").config("spark.sql.streaming.checkpointLocation",s"$checkPointDir").config("spark.cleaner.referenceTracking.blocking", true).config("spark.cleaner.referenceTracking.blocking.shuffle", true).config("spark.cleaner.referenceTracking.cleanCheckpoints", true).getOrCreate()
import spark.implicits._

val serveSchema = new StructType().add("log_type",StringType).add("time_stamp",StringType).add("host_name",StringType)

val serveDF = spark.readStream.option("delimiter", "\t").format("com.databricks.spark.csv").schema(serveSchema).load("s3a://eee/logs/vvv*.log")

 

serveDF.withColumn("minute",substring(col("time_stamp"),15,2)).writeStream.format("parquet").option("path", "/tmp/serve/").outputMode("Append").start

 

I tried below options, but not useful.

1. trigger(Trigger.ProcessingTime("1 second"))  : : got error value not found Trigger

e.g. serveDF.withColumn("minute",substring(col("time_stamp"),15,2)).writeStream.format("parquet").option("path", "/tmp/serve/").outputMode("Append").trigger(Trigger.ProcessingTime("1 second")).start

 

2. repartition(2) / coalesce(2) : not produced expected result  

e.g. serveDF.withColumn("minute",substring(col("time_stamp"),15,2)).repartition(2).writeStream.format("parquet").option("path", "/tmp/serve/").outputMode("Append").start

 

3. config("spark.sql.files.maxRecordsPerFile", 15000000)  : not produced expected result