Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Explode function in Data Frames

avatar

We have a nested parquet file with below sample strucutre..

 

department_id  String

department_name String

Employees Array<Struct<first_name String, last_name String, email String>>

 

We want to flatten above structure using explode API of data frames. Whatever samples that we got from the documentation and git is talking about exploding a String by splitting but here we have an Array strucutre. We did not get any examples for this in web also. Or I could be missing something..

 

Could anyone please help in using explode method with nested array strucutre..

 

Thanks in advance..

 

I referrerd below API and github links

 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.s...

 

 

Thanks !

Siva

1 ACCEPTED SOLUTION

avatar
Contributor

Hey Siva-

 

This is Chris Fregly from Databricks.  I just talked to my co-worker, Michael Armbrust (Spark SQL, Catalyst, DataFrame guru), and we came up with the code sample below.  Hopefully, this is what you're looking for.

 

Michael admits that this is a bit verbose, so he may implement a more condense `explodeArray()` method on DataFrame at some point.

 

 

case class Employee(firstName: String, lastName: String, email: String)
case class Department(id: String, name: String)
case class DepartmentWithEmployees(department: Department, employees: Seq[Employee])

val employee1 = new Employee("michael", "armbrust", "abc123@prodigy.net")
val employee2 = new Employee("chris", "fregly", "def456@compuserve.net")

val department1 = new Department("123456", "Engineering")
val department2 = new Department("123456", "Psychology")
val departmentWithEmployees1 = new DepartmentWithEmployees(department1, Seq(employee1, employee2))
val departmentWithEmployees2 = new DepartmentWithEmployees(department2, Seq(employee1, employee2))

val departmentWithEmployeesRDD = sc.parallelize(Seq(departmentWithEmployees1, departmentWithEmployees2))
departmentWithEmployeesRDD.toDF().saveAsParquetFile("dwe.parquet")

val departmentWithEmployeesDF = sqlContext.parquetFile("dwe.parquet")

// This would be replaced by explodeArray() val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) { case Row(employee: Seq[Row]) => employee.map(employee => Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) ) }

 

View solution in original post

6 REPLIES 6

avatar
Contributor

Hey Siva-

 

This is Chris Fregly from Databricks.  I just talked to my co-worker, Michael Armbrust (Spark SQL, Catalyst, DataFrame guru), and we came up with the code sample below.  Hopefully, this is what you're looking for.

 

Michael admits that this is a bit verbose, so he may implement a more condense `explodeArray()` method on DataFrame at some point.

 

 

case class Employee(firstName: String, lastName: String, email: String)
case class Department(id: String, name: String)
case class DepartmentWithEmployees(department: Department, employees: Seq[Employee])

val employee1 = new Employee("michael", "armbrust", "abc123@prodigy.net")
val employee2 = new Employee("chris", "fregly", "def456@compuserve.net")

val department1 = new Department("123456", "Engineering")
val department2 = new Department("123456", "Psychology")
val departmentWithEmployees1 = new DepartmentWithEmployees(department1, Seq(employee1, employee2))
val departmentWithEmployees2 = new DepartmentWithEmployees(department2, Seq(employee1, employee2))

val departmentWithEmployeesRDD = sc.parallelize(Seq(departmentWithEmployees1, departmentWithEmployees2))
departmentWithEmployeesRDD.toDF().saveAsParquetFile("dwe.parquet")

val departmentWithEmployeesDF = sqlContext.parquetFile("dwe.parquet")

// This would be replaced by explodeArray() val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) { case Row(employee: Seq[Row]) => employee.map(employee => Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) ) }

 

avatar

hi Chris and Michael 

 

Thanks for your quick response and solution.. Let me try this solution in our data model.. I will update this thread about the outcome.. Many thanks again.. explodeArray would be great option.. Nevetheless this solution is really cool.. Thanks again for you time..

 

Thanks !

Siva

avatar
New Contributor

I have a similar situation, and am having trouble coding it in Scala due to my limited knowledge of Scala.

 

In my case, the data is read from an Avro file. I used my debugger to find out the structure of the Avro file.

It is pretty complicated as you can see below:

 

[Report_Header: struct<Report_Name:string,Report_Date_Time:string,Aircraft_Type_Number:string,Aircraft_Serial_Number:string,Aircraft_Tail_Number:string,Version_Number:string,Tables_Part_Number:string>, PFS: array<struct<PFS_Header:struct<Flight_Leg:string,Flight_Number:string,From:string,Start:string,To:string,End:string>,Flight_Deck_Effects:struct<Count:string,FDE:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Associated_Fault_Messages:struct<Fault_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Parameter_Snapshot:string>>>>>>,Uncorrelated_Fault_Messages:struct<Count:string,Fault_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Parameter_Snapshot:string>>>,Service_Messages:struct<Count:string,Service_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string>>>,Aircraft_Servicing:struct<ENGINES:struct<LEFT_ENGINE:struct<Oil_Level:string,Oil_Filt_Bypass:string,Oil_Filt_Impend_Bypass:string,Fuel_Filt_Bypass:string,Fuel_Filt_Impend_Bypass:string>,RIGHT_ENGINE:struct<Oil_Level:string,Oil_Filt_Bypass:string,Oil_Filt_Impend_Bypass:string,Fuel_Filt_Bypass:string,Fuel_Filt_Impend_Bypass:string>>,APU:struct<HOURS:string,CYCLES:string,Oil_Level:string,Lube_Filt_Bypass:string,Generator_Filt_Bypass:string>,TIRE_PRESSURES:struct<Press:struct<LEFT_NLG:string,RIGHT_NLG:string,LOB_MLG:string,LIB_MLG:string,RIB_MLG:string,ROB_MLG:string>,Temp:struct<LEFT_NLG:string,RIGHT_NLG:string,LOB_MLG:string,LIB_MLG:string,RIB_MLG:string,ROB_MLG:string>>,BRAKES:struct<Wear:struct<LOB:string,LIB:string,RIB:string,ROB:string>,Cycles_Prediction:struct<LOB:string,LIB:string,RIB:string,ROB:string>>,HYDRAULICS:struct<Level:struct<SYS1:string,SYS2:string,SYS3:string>,Temperature:struct<SYS1:string,SYS2:string,SYS3:string>>,FUEL_Quantity:struct<LEFT:string,CENTER:string,RIGHT:string,TOTAL:string>,FIDEX:struct<SMOKE_DET_STATUS:struct<Fwd_Cargo_SD1:string,Fwd_Cargo_SD2:string,Fwd_Cargo_SD3:string,Fwd_Cargo_SD4:string,Aft_Cargo_SD1:string,Aft_Cargo_SD2:string,Aft_Cargo_SD3:string,Aft_Cargo_SD4:string,Eqpt_Bay_SD1:string,Eqpt_Bay_SD2:string,IFE_Bay_SD1:string,IFE_Bay_SD2:string,Fwd_Lavatory_SD:string,Lavatory_C_SD:string,Lavatory_D_SD:string,Lavatory_E_SD:string>>,WATER_WASTE:struct<Waste_Tank_Level:string,Potable_Water_Level:string>,CREW_O2:struct<Bottle_Press:string>>>>]

 

While I do not want the entire solution, I just would like some help getting started on this. Let's just take the first structure:

 

[Report_Header: struct<Report_Name:string,Report_Date_Time:string,Aircraft_Type_Number:string,Aircraft_Serial_Number:string,Aircraft_Tail_Number:string,Version_Number:string,Tables_Part_Number:string>]

 

If I were to just expand this into separate columns, what would my explode() function look like?

 

Here is my attempt, but I am stuck with the actual implementation of the explode() function.

 

  case class Report_Header( Report_Name: String,
                            Report_Date_Time: String,
                            Aircraft_Type_Number: String,
                            Aircraft_Serial_Number: String,
                            Aircraft_Tail_Number: String,
                            Version_Number: String,
                            Tables_Part_Number: String)

  def testAvro(inputFile: String, outputFile: String, context: SparkContext): Unit = {
    val sqlContext = new SQLContext(context)

    val pfsDetailedReport = sqlContext.read
      .format("com.databricks.spark.avro")
      .load(inputFile)

    val explodedPfsDetailedReport = pfsDetailedReport.explode(pfsDetailedReport("Report_Header")) {

      // Stuck here - please help

      case Row(Report_header: Seq[Row @unchecked]) =>
        header.map(??? => ???)

     

      // Example below taken from above

      // case Row(employee: Seq[Row @unchecked]) =>
      //  employee.map(employee => Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]))

    }
    explodedPfsDetailedReport.write
      .format("com.databricks.spark.avro")
      .save(outputFile)
  }

 

 Any help will be highly appreciated.

avatar
New Contributor

To answer my own question, here is a link that may be useful.

 

https://github.com/julianpeeters/sbt-avrohugger

https://github.com/julianpeeters/avro-scala-macro-annotations

 

I short, you need to build a case class for every Avro structure and write an apply() method to convert

a scala Row object to your case class. The links above will help to do the former. The latter has to be

done by hand, but I think it should be possible to generate code for the Row to object conversion as well.

avatar
New Contributor

This info is very helpful, but I've got a twist that I can't seem to figure out.

 

I got this working for a single level of depth, but I'm somehow challenged (and a Scala noob) by multiple levels.  Below is the schema of my DataFrame (modelled after the HL7 FHIR DSTU2 specification, read from a parquet file).  I am trying to explode out the individual values in the "given" field of the "name" struct array (so, a nested array), for example, but following the initial explode of the name array, the field I exploded to (called "nar") is not an array of struct, it's simply an array of String, which I think is challenging to the explode() method.  I've tried a number of different approaches, but haven't found the right combination.  I need to be able to test each individual "given" names (and other values in the struct) against those values in other records (name matching).  Do you think this is possible?

 

case class Nar(nar: Seq[String])

case class Gname(gname: String)
val xdf = patdf.explode($"name")

         {case Row(name: Seq[Row]) => name.map{name =>

               Nar( name( name.fieldIndex("given") ).asInstanceOf[Seq[String]] )}}

val ydf = xdf.explode($"nar")

         {case Row(nar: Seq[Row]) => nar.map{nar =>

               Gname(nar(0).asInstanceOf[String])}}

ydf.select($"gname").foreach(println)

 

...

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:32)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

...

 

root
   |-- PatientCareProvider: array (nullable = true)
   |    |-- element: string (containsNull = true)
   |-- UniqueId: string (nullable = true)
   |-- active: boolean (nullable = true)
   |-- address: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- addressLine: array (nullable = true)
   |    |    |    |-- element: string (containsNull = true)
   |    |    |-- city: string (nullable = true)
   |    |    |-- country: string (nullable = true)
   |    |    |-- period: struct (nullable = true)
   |    |    |    |-- startTime: struct (nullable = true)
   |    |    |    |    |-- value: long (nullable = true)
   |    |    |-- postalCode: string (nullable = true)
   |    |    |-- region: string (nullable = true)
   |    |    |-- text: string (nullable = true)
   |    |    |-- use: string (nullable = true)
   |-- birthDate: struct (nullable = true)
   |    |-- value: long (nullable = true)
   |-- gender: string (nullable = true)
   |-- link: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- linkType: string (nullable = true)
   |    |    |-- other: struct (nullable = true)
   |    |    |    |-- display: string (nullable = true)
   |    |    |    |-- reference: string (nullable = true)
   |-- managingOrganization: struct (nullable = true)
   |    |-- display: string (nullable = true)
   |    |-- reference: string (nullable = true)
   |-- name: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- family: array (nullable = true)
   |    |    |    |-- element: string (containsNull = true)
   |    |    |-- given: array (nullable = true)
   |    |    |    |-- element: string (containsNull = true)
   |    |    |-- period: struct (nullable = true)
   |    |    |    |-- startTime: struct (nullable = true)
   |    |    |    |    |-- value: long (nullable = true)
   |    |    |-- prefix: array (nullable = true)
   |    |    |    |-- element: string (containsNull = true)
   |    |    |-- suffix: array (nullable = true)
   |    |    |    |-- element: string (containsNull = true)
   |    |    |-- text: string (nullable = true)
   |    |    |-- use: string (nullable = true)
   |-- resourceId: string (nullable = true)
   |-- telecom: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- period: struct (nullable = true)
   |    |    |    |-- startTime: struct (nullable = true)
   |    |    |    |    |-- value: long (nullable = true)
   |    |    |-- system: string (nullable = true)
   |    |    |-- use: string (nullable = true)
   |    |    |-- value: string (nullable = true)

 

Then with "given" exploded as "nar":

   |-- nar: array (nullable = true)
   |    |-- element: string (containsNull = true)

 

Then with "given" exploded as "gname":

   |-- gname: string (nullable = true)

avatar
New Contributor

I think I figured this out.  In this case, I needed to use the other flavor of explode in the second operation:

 

val ydf = xdf.explode("nar", "gname") { nar: Seq[String] => nar }

 

Always happens, as soon as you ask the question publicly...