Member since
10-02-2015
3
Posts
1
Kudos Received
0
Solutions
10-07-2015
03:02 PM
To answer my own question, here is a link that may be useful. https://github.com/julianpeeters/sbt-avrohugger https://github.com/julianpeeters/avro-scala-macro-annotations I short, you need to build a case class for every Avro structure and write an apply() method to convert a scala Row object to your case class. The links above will help to do the former. The latter has to be done by hand, but I think it should be possible to generate code for the Row to object conversion as well.
... View more
10-06-2015
11:47 AM
I have a similar situation, and am having trouble coding it in Scala due to my limited knowledge of Scala. In my case, the data is read from an Avro file. I used my debugger to find out the structure of the Avro file. It is pretty complicated as you can see below: [Report_Header: struct<Report_Name:string,Report_Date_Time:string,Aircraft_Type_Number:string,Aircraft_Serial_Number:string,Aircraft_Tail_Number:string,Version_Number:string,Tables_Part_Number:string>, PFS: array<struct<PFS_Header:struct<Flight_Leg:string,Flight_Number:string,From:string,Start:string,To:string,End:string>,Flight_Deck_Effects:struct<Count:string,FDE:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Associated_Fault_Messages:struct<Fault_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Parameter_Snapshot:string>>>>>>,Uncorrelated_Fault_Messages:struct<Count:string,Fault_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string,Parameter_Snapshot:string>>>,Service_Messages:struct<Count:string,Service_Message:array<struct<Equation_ID:string,Message_Text:string,Status:string,Occurrences:string,Recurrences:string,Fault_Code:string,Flight_Phase:string,Logged_Date_Time:string>>>,Aircraft_Servicing:struct<ENGINES:struct<LEFT_ENGINE:struct<Oil_Level:string,Oil_Filt_Bypass:string,Oil_Filt_Impend_Bypass:string,Fuel_Filt_Bypass:string,Fuel_Filt_Impend_Bypass:string>,RIGHT_ENGINE:struct<Oil_Level:string,Oil_Filt_Bypass:string,Oil_Filt_Impend_Bypass:string,Fuel_Filt_Bypass:string,Fuel_Filt_Impend_Bypass:string>>,APU:struct<HOURS:string,CYCLES:string,Oil_Level:string,Lube_Filt_Bypass:string,Generator_Filt_Bypass:string>,TIRE_PRESSURES:struct<Press:struct<LEFT_NLG:string,RIGHT_NLG:string,LOB_MLG:string,LIB_MLG:string,RIB_MLG:string,ROB_MLG:string>,Temp:struct<LEFT_NLG:string,RIGHT_NLG:string,LOB_MLG:string,LIB_MLG:string,RIB_MLG:string,ROB_MLG:string>>,BRAKES:struct<Wear:struct<LOB:string,LIB:string,RIB:string,ROB:string>,Cycles_Prediction:struct<LOB:string,LIB:string,RIB:string,ROB:string>>,HYDRAULICS:struct<Level:struct<SYS1:string,SYS2:string,SYS3:string>,Temperature:struct<SYS1:string,SYS2:string,SYS3:string>>,FUEL_Quantity:struct<LEFT:string,CENTER:string,RIGHT:string,TOTAL:string>,FIDEX:struct<SMOKE_DET_STATUS:struct<Fwd_Cargo_SD1:string,Fwd_Cargo_SD2:string,Fwd_Cargo_SD3:string,Fwd_Cargo_SD4:string,Aft_Cargo_SD1:string,Aft_Cargo_SD2:string,Aft_Cargo_SD3:string,Aft_Cargo_SD4:string,Eqpt_Bay_SD1:string,Eqpt_Bay_SD2:string,IFE_Bay_SD1:string,IFE_Bay_SD2:string,Fwd_Lavatory_SD:string,Lavatory_C_SD:string,Lavatory_D_SD:string,Lavatory_E_SD:string>>,WATER_WASTE:struct<Waste_Tank_Level:string,Potable_Water_Level:string>,CREW_O2:struct<Bottle_Press:string>>>>] While I do not want the entire solution, I just would like some help getting started on this. Let's just take the first structure: [Report_Header: struct<Report_Name:string,Report_Date_Time:string,Aircraft_Type_Number:string,Aircraft_Serial_Number:string,Aircraft_Tail_Number:string,Version_Number:string,Tables_Part_Number:string>] If I were to just expand this into separate columns, what would my explode() function look like? Here is my attempt, but I am stuck with the actual implementation of the explode() function. case class Report_Header( Report_Name: String, Report_Date_Time: String, Aircraft_Type_Number: String, Aircraft_Serial_Number: String, Aircraft_Tail_Number: String, Version_Number: String, Tables_Part_Number: String) def testAvro(inputFile: String, outputFile: String, context: SparkContext): Unit = { val sqlContext = new SQLContext(context) val pfsDetailedReport = sqlContext.read .format("com.databricks.spark.avro") .load(inputFile) val explodedPfsDetailedReport = pfsDetailedReport.explode(pfsDetailedReport("Report_Header")) { // Stuck here - please help case Row(Report_header: Seq[Row @unchecked]) => header.map(??? => ???) // Example below taken from above // case Row(employee: Seq[Row @unchecked]) => // employee.map(employee => Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String])) } explodedPfsDetailedReport.write .format("com.databricks.spark.avro") .save(outputFile) } Any help will be highly appreciated.
... View more