Support Questions

Find answers, ask questions, and share your expertise

How to delete empty columns in df when writing to parquet?

avatar
Rising Star

I am trying to read data from kafka and writing them in parquet format via Spark Streaming. The problem is, the data from kafka are in variable data structure.

For example, app one has columns A,B,C, app two has columns B,C,D. So the data frame I read from kafka has all columns ABCD. When I decide to write the dataframe to parquet file partitioned with app name, the parquet file of app one also contains columns D, where the columns D is empty and it contains no data actually. So how to filter the empty columns when I writing dataframe to parquet?

Thanks!

2 REPLIES 2

avatar

I couldn't come up with anything better than manually scanning the DataFrame to check if all values in a column are NULL.

Something like:

// Returns the names of all empty columns of DataFrame
def getEmptyColNames(df: DataFrame): Seq[String] = {
  df.cache()

  val colNames: Seq[String] = df.columns
  colNames.filter { (colName: String) =>
    df.filter(df(colName).isNotNull).count() == 0
  }
}

// Drops all empty columns of DataFrame
def dropEmptyCols(df: DataFrame): DataFrame = {
  val emptyColNames: Seq[String] = getEmptyColNames(df)
  
  if (emptyColNames.isEmpty) df
  else df.drop(emptyColNames: _*)
}

val dfOriginal: DataFrame
val dfNonEmptyCols: DataFrame = dropEmptyCols(dfOriginal)

@Junfeng Chen Were you able to find a more efficient / smarter way?

avatar
New Contributor

I am facing same issue. Did you find any solution?