Created 10-01-2017 08:33 PM
I am unable to execute this Job and have been trying for a few days now.
One of the Tables has 2 Billion Rows, 2nd one has 220 Mil Rows, 3rd one has 1.4 Mil Rows, and the 4th one has about 150K rows. Trying to store the data in Dataframes and cache the large tables and do the ETL via Spark SQL 2.2 and then load the file processed data to Redshift. I haven't figured out the Redshift piece yet but this code never returned any results to me. How should this be rewritten and what should i optimize to make the ETL process work and load this data to Redshift.
This process never completed so far and took for about an hour or more to run and crashed always.
Can you please help?
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val spark = SparkSession .builder() .appName("SparkSQL") .config("spark.sql.DataWarehouse.dir", "s3://phy.datawarehouse/data/orcdb/") .enableHiveSupport() .getOrCreate() import spark.implicits._ //Read File From S3 and register it as Temporary View as Employee val EmpDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Employee") EmpDF.createOrReplaceTempView("Employee") //Read File From S3 and register it as Temporary View as Employee Details val EmpDetDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/EmployeeDetails") EmpDetDF.createOrReplaceTempView("EmployeeDetails") //Read File From S3 and register it as Temporary View as Sales val SalesDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Sales") SalesDF.createOrReplaceTempView("Sales") //Read File From S3 and register it as Temporary View as Location val LocationDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Location") LocationDF.createOrReplaceTempView("Location") //Show the tables's first 20 rows EmpDF.show(10) EmpDetDF.show(10) SalesDF.show(10) LocationDF.show(10) //Display the the total number of rows in the dataframe println("Total number of Emp: " + EmpDF.count()) println("Total number of Emp Details: " + EmpDetDF.count()) println("Total number of Sales: " + SalesDF.count()) println("Total number of Location: " + LocationDF.count()) //Filter Datasets Employee val resultsEmp = spark.sql("SELECT * FROM Employee WHERE NAME_TYPE = 'USA' AND SUB_TYPE = 'NA'") resultsEmp.show(10, false) //Filter Datasets Trait val resultsLoc = spark.sql("SELECT * FROM Location WHERE LOC_TYPE = 'USA' ") resultsLoc.show(10, false) // Creates a temporary view using the DataFrame resultsEmp.createOrReplaceTempView("Employee") resultsLoc.createOrReplaceTempView("Location") //accessing catalog metadata using spark.catalog spark.catalog.listDatabases.show(false) //show database tablenames; it should show our table name spark.catalog.listTables.show(false) //Cache Sales DataFrame (2.2 Billion Rows) SalesDF.cache() //Cache Employee Details Data Frame (220 Million Rows) EmpDetDF.cache() val resultsDF = spark.sql("""SELECT DISTINCT EM.EMP_ID, EMPDET.EMPDETAILS, LOC.LOC_ID, SA.SALES_VALUE FROM Sales SA JOIN EmployeeDetails ED ON SA.EMPDET_ID = ED.EMPDET_ID JOIN Employee EM ON EM.EMP_ID = ED.EMP_ID JOIN Location LO ON LO.LOC_ID = SA.LOC_ID""") resultsDF.show(10)
Created 10-02-2017 05:46 AM
For the given scenario(and code), I believe following amendments will work to optimize the code, please have a look and revert for any concerns.
looks the main reason for the longer execution is huge IO reads (from the data volumes you have specified in query), so better to take one hop at time and go for next one on execution prspective, the lazy evolution can be streamlined by keeping the aggregated functions between the code - (not recommended for small tables / operations ).
Hope this helps !!
Created 10-02-2017 02:48 PM
Thanks for your response Raju!
I am very new to Spark and trying a POC on Data Warehouse cloud work. Yes the files are huge. I am on AWS and using EMR for the Spark work. the S3 Bucket has couple of 16 GB Files in ORC Format. I am going to split them into multiple files and use the Partitioning and Bucketing option for faster I/O.
Can you explain your #2 and #4. If possible, can you rewrite my code so that i can understand this better on what you meant by get Shuffle per Emp_Id, Loc_Id, EmpDet_ID. Also, using SparkSQL and DataFrames, how will i join two tables and then have the 3rd one and then the 4th one. Can you share an example of show that in my code.
Appreciate your response and help.
Thanks!