<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Unable to retrieve data using the Data Frames and Spark SQL in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Unable-to-retrieve-data-using-the-Data-Frames-and-Spark-SQL/m-p/203732#M165731</link>
    <description>&lt;P&gt;
	I am unable to execute this Job and have been trying for a few days now.&lt;/P&gt;&lt;P&gt;
	One of the Tables has 2 Billion Rows, 2nd one has 220 Mil Rows, 3rd one has 1.4 Mil Rows, and the 4th one has about 150K rows. Trying to store the data in Dataframes and cache the large tables and do the ETL via Spark SQL 2.2 and then load the file processed data to Redshift. I haven't figured out the Redshift piece yet but this code never returned any results to me. How should this be rewritten and what should i optimize to make the ETL process work and load this data to Redshift.&lt;/P&gt;&lt;P&gt;
	This process never completed so far and took for about an hour or more to run and crashed always.&lt;/P&gt;&lt;P&gt;
	&lt;A href="https://community.cloudera.com/legacyfs/online/attachments/40595-spark-scala-employee.txt"&gt;spark-scala-employee.txt&lt;/A&gt;&lt;/P&gt;&lt;P&gt;
	Can you please help?&lt;/P&gt;&lt;PRE&gt;import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


val spark = SparkSession
      .builder()
      .appName("SparkSQL")
      .config("spark.sql.DataWarehouse.dir", "s3://phy.datawarehouse/data/orcdb/")
      .enableHiveSupport()
      .getOrCreate()
      
import spark.implicits._


    //Read File From S3 and register it as Temporary View as Employee
        val EmpDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Employee")
        EmpDF.createOrReplaceTempView("Employee")
    
    //Read File From S3 and register it as Temporary View as Employee Details 
        val EmpDetDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/EmployeeDetails")
        EmpDetDF.createOrReplaceTempView("EmployeeDetails")
    
    //Read File From S3 and register it as Temporary View as Sales 
        val SalesDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Sales")
        SalesDF.createOrReplaceTempView("Sales")
    
    //Read File From S3 and register it as Temporary View as Location 
        val LocationDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Location")
        LocationDF.createOrReplaceTempView("Location")


    //Show the tables's first 20 rows
        EmpDF.show(10)
        EmpDetDF.show(10)
        SalesDF.show(10)
        LocationDF.show(10)
    
    //Display the the total number of rows in the dataframe
        println("Total number of Emp: " + EmpDF.count())
        println("Total number of Emp Details: " + EmpDetDF.count())
        println("Total number of Sales: " + SalesDF.count())
        println("Total number of Location: " + LocationDF.count())
		
		
  //Filter Datasets Employee
        val resultsEmp = spark.sql("SELECT * FROM Employee WHERE NAME_TYPE = 'USA' AND SUB_TYPE = 'NA'")
        resultsEmp.show(10, false)


    //Filter Datasets Trait
        val resultsLoc = spark.sql("SELECT * FROM Location WHERE LOC_TYPE = 'USA' ")
        resultsLoc.show(10, false)
    
    // Creates a temporary view using the DataFrame
        resultsEmp.createOrReplaceTempView("Employee")
        resultsLoc.createOrReplaceTempView("Location")




    //accessing catalog metadata using spark.catalog
        spark.catalog.listDatabases.show(false)
     
    //show database tablenames; it should show our table name
        spark.catalog.listTables.show(false)
        
    //Cache Sales DataFrame (2.2 Billion Rows) 
        SalesDF.cache()    
        
    //Cache Employee Details Data Frame (220 Million Rows)
        EmpDetDF.cache()     
        
    val resultsDF = spark.sql("""SELECT DISTINCT EM.EMP_ID, EMPDET.EMPDETAILS, LOC.LOC_ID, SA.SALES_VALUE 
		                         FROM Sales SA
                                 JOIN EmployeeDetails ED ON SA.EMPDET_ID = ED.EMPDET_ID
                                 JOIN Employee EM ON EM.EMP_ID = ED.EMP_ID
                                 JOIN Location LO ON LO.LOC_ID = SA.LOC_ID""")
        resultsDF.show(10)
&lt;/PRE&gt;</description>
    <pubDate>Mon, 02 Oct 2017 03:33:50 GMT</pubDate>
    <dc:creator>vinnanji</dc:creator>
    <dc:date>2017-10-02T03:33:50Z</dc:date>
  </channel>
</rss>

