Support Questions

Find answers, ask questions, and share your expertise

How do I run multiple pivots on a Spark DataFrame?

avatar
Rising Star

For example, I have a Spark DataFrame with three columns 'Domain', 'ReturnCode', and 'RequestType'

Example Starting Dataframe

www.google.com,200,GET
www.google.com,300,GET
www.espn.com,200,POST

I would like to pivot on Domain and get aggregate counts for the various ReturnCodes and RequestTypes. Do I have to pivot each table separately and then join them back together or is it possible to do it in one step?

Desired Dataframe

Domain,200,300,GET,POST
www.google.com,1,1,2,0
www.espn.com,1,0,0,1

Example of Pivot Code with Join

val dfa = df.groupby('Domain').pivot('ReturnCode').count()
val dfb = df.groupby('Domain').pivot('RequestType').count()
dfa.join(dfb, Seq("ReferrerDomain", "ReferrerDomain")).show()
2 REPLIES 2

avatar
Expert Contributor

Since we can do pivoting on only one column so one way of doing in one go is combine the 2 columns to a new column and use that new column as pivot column. The output is some what close to what you are expecting. Hope this helps.

The Input:

Domain,ReturnCode,RequestTyp
ewww.google.com,200,GET
www.google.com,300,GET
www.espn.com,200,POST		

The code:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.udf

object pivotDF {
  // Define the application Name
  val AppName: String = "pivotDF"

  // Set the logging level
  Logger.getLogger("org.apache").setLevel(Level.ERROR)

  // Define a udf to concatenate two passed in string values
  val concat = udf( (first: String, second: String) => { first + " " + second } )

  def main (args: Array[String]) {

    // define the input parameters
    val input_file = "/Users/gangadharkadam/myapps/pivot/src/main/resources/domains.csv"

    // Create the Spark configuration and the spark context
    println("Initializing the Spark Context...")

    val conf = new SparkConf().setAppName(AppName).setMaster("local")

    // Define the Spark Context
    val sc = new SparkContext(conf)

    // Define the SQL context
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    //Load and parse the Engine Information data into Spark DataFrames
    val domainDF = sqlContext

      //Define the format
      .read.format("com.databricks.spark.csv")

      //use the first line as header
      .option("header", "true")
      
      //Automatically infer the data types and schema
      .option("inferSchema", "true")
      
      //load the file
      .load(input_file)
    
    // pivot using concatenated column
    domainDF.withColumn("combColumn", concat($"ReturnCode",$"RequestType"))
      .groupBy("domain").pivot("combColumn").agg(count).show()
  }
}

The output:

domain

200 GET200 POST300 GET
www.espn.com0 10
www.google.com101

avatar
Rising Star

Thanks @Gangadhar Kadam, i understand that this is a possible workaround but I would like to know if I want the two columns separate is that possible or do I need to do the pivot twice and join the tables?