Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Unable to retrieve data using the Data Frames and Spark SQL

avatar
New Contributor

I am unable to execute this Job and have been trying for a few days now.

One of the Tables has 2 Billion Rows, 2nd one has 220 Mil Rows, 3rd one has 1.4 Mil Rows, and the 4th one has about 150K rows. Trying to store the data in Dataframes and cache the large tables and do the ETL via Spark SQL 2.2 and then load the file processed data to Redshift. I haven't figured out the Redshift piece yet but this code never returned any results to me. How should this be rewritten and what should i optimize to make the ETL process work and load this data to Redshift.

This process never completed so far and took for about an hour or more to run and crashed always.

spark-scala-employee.txt

Can you please help?

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


val spark = SparkSession
      .builder()
      .appName("SparkSQL")
      .config("spark.sql.DataWarehouse.dir", "s3://phy.datawarehouse/data/orcdb/")
      .enableHiveSupport()
      .getOrCreate()
      
import spark.implicits._


    //Read File From S3 and register it as Temporary View as Employee
        val EmpDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Employee")
        EmpDF.createOrReplaceTempView("Employee")
    
    //Read File From S3 and register it as Temporary View as Employee Details 
        val EmpDetDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/EmployeeDetails")
        EmpDetDF.createOrReplaceTempView("EmployeeDetails")
    
    //Read File From S3 and register it as Temporary View as Sales 
        val SalesDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Sales")
        SalesDF.createOrReplaceTempView("Sales")
    
    //Read File From S3 and register it as Temporary View as Location 
        val LocationDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Location")
        LocationDF.createOrReplaceTempView("Location")


    //Show the tables's first 20 rows
        EmpDF.show(10)
        EmpDetDF.show(10)
        SalesDF.show(10)
        LocationDF.show(10)
    
    //Display the the total number of rows in the dataframe
        println("Total number of Emp: " + EmpDF.count())
        println("Total number of Emp Details: " + EmpDetDF.count())
        println("Total number of Sales: " + SalesDF.count())
        println("Total number of Location: " + LocationDF.count())
		
		
  //Filter Datasets Employee
        val resultsEmp = spark.sql("SELECT * FROM Employee WHERE NAME_TYPE = 'USA' AND SUB_TYPE = 'NA'")
        resultsEmp.show(10, false)


    //Filter Datasets Trait
        val resultsLoc = spark.sql("SELECT * FROM Location WHERE LOC_TYPE = 'USA' ")
        resultsLoc.show(10, false)
    
    // Creates a temporary view using the DataFrame
        resultsEmp.createOrReplaceTempView("Employee")
        resultsLoc.createOrReplaceTempView("Location")




    //accessing catalog metadata using spark.catalog
        spark.catalog.listDatabases.show(false)
     
    //show database tablenames; it should show our table name
        spark.catalog.listTables.show(false)
        
    //Cache Sales DataFrame (2.2 Billion Rows) 
        SalesDF.cache()    
        
    //Cache Employee Details Data Frame (220 Million Rows)
        EmpDetDF.cache()     
        
    val resultsDF = spark.sql("""SELECT DISTINCT EM.EMP_ID, EMPDET.EMPDETAILS, LOC.LOC_ID, SA.SALES_VALUE 
		                         FROM Sales SA
                                 JOIN EmployeeDetails ED ON SA.EMPDET_ID = ED.EMPDET_ID
                                 JOIN Employee EM ON EM.EMP_ID = ED.EMP_ID
                                 JOIN Location LO ON LO.LOC_ID = SA.LOC_ID""")
        resultsDF.show(10)
2 REPLIES 2

avatar
Super Collaborator

Hi @Venkatesh Innanji,

For the given scenario(and code), I believe following amendments will work to optimize the code, please have a look and revert for any concerns.

  1. Remove the un-wanted columns from initial selection ( instead of * specify the columns), such cases you will end-up having fewer than many, this helps to quicker shuffle operation.
  2. In general where there is huge shuffle, there it takes all the time and resources, in case of this join tables get shuffle per Emp_id, LOC_ID,EMPDET_ID) doing repartition on Join columns for the bigger tables leaves quicker execution times as will reduce the relative shuffle data between the executors, as they will end up having the same hash code.
  3. Please dont cache the dataframe unless you need multiple times with same dimension you are join (will cause extra over head for multiple reads and writes) with the excessive IO (the primary reason for longer execution duration).
  4. Instead of one hop go for multiple, the aggregate function will ensure that, one after one get executed and effectively reduce the data carried for next sql in this case join EMP and DEPT and compute the distinct (if needed) and then join with LOC and later with Sales the order should compute the smaller data sets first and leave the larger to last, as we may need to shuffle the DF every time we join with other DF.

looks the main reason for the longer execution is huge IO reads (from the data volumes you have specified in query), so better to take one hop at time and go for next one on execution prspective, the lazy evolution can be streamlined by keeping the aggregated functions between the code - (not recommended for small tables / operations ).

Hope this helps !!

avatar
New Contributor

Thanks for your response Raju!

I am very new to Spark and trying a POC on Data Warehouse cloud work. Yes the files are huge. I am on AWS and using EMR for the Spark work. the S3 Bucket has couple of 16 GB Files in ORC Format. I am going to split them into multiple files and use the Partitioning and Bucketing option for faster I/O.

Can you explain your #2 and #4. If possible, can you rewrite my code so that i can understand this better on what you meant by get Shuffle per Emp_Id, Loc_Id, EmpDet_ID. Also, using SparkSQL and DataFrames, how will i join two tables and then have the 3rd one and then the 4th one. Can you share an example of show that in my code.

Appreciate your response and help.

Thanks!