import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val spark = SparkSession .builder() .appName("SparkSQL") .config("spark.sql.DataWarehouse.dir", "s3://phy.datawarehouse/data/orcdb/") .enableHiveSupport() .getOrCreate() import spark.implicits._ //Read File From S3 and register it as Temporary View as Employee val EmpDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Employee") EmpDF.createOrReplaceTempView("Employee") //Read File From S3 and register it as Temporary View as Employee Details val EmpDetDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/EmployeeDetails") EmpDetDF.createOrReplaceTempView("EmployeeDetails") //Read File From S3 and register it as Temporary View as Sales val SalesDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Sales") SalesDF.createOrReplaceTempView("Sales") //Read File From S3 and register it as Temporary View as Location val LocationDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Location") LocationDF.createOrReplaceTempView("Location") //Show the tables's first 20 rows EmpDF.show(10) EmpDetDF.show(10) SalesDF.show(10) LocationDF.show(10) //Display the the total number of rows in the dataframe println("Total number of Emp: " + EmpDF.count()) println("Total number of Emp Details: " + EmpDetDF.count()) println("Total number of Sales: " + SalesDF.count()) println("Total number of Location: " + LocationDF.count()) //Filter Datasets Employee val resultsEmp = spark.sql("SELECT * FROM Employee WHERE NAME_TYPE = 'USA' AND SUB_TYPE = 'NA'") resultsEmp.show(10, false) //Filter Datasets Trait val resultsLoc = spark.sql("SELECT * FROM Location WHERE LOC_TYPE = 'USA' ") resultsLoc.show(10, false) // Creates a temporary view using the DataFrame resultsEmp.createOrReplaceTempView("Employee") resultsLoc.createOrReplaceTempView("Location") //accessing catalog metadata using spark.catalog spark.catalog.listDatabases.show(false) //show database tablenames; it should show our table name spark.catalog.listTables.show(false) //Cache Sales DataFrame (2.2 Billion Rows) SalesDF.cache() //Cache Employee Details Data Frame (220 Million Rows) EmpDetDF.cache() val resultsDF = spark.sql("""SELECT DISTINCT EM.EMP_ID, EMPDET.EMPDETAILS, LOC.LOC_ID, SA.SALES_VALUE FROM Sales SA JOIN EmployeeDetails ED ON SA.EMPDET_ID = ED.EMPDET_ID JOIN Employee EM ON EM.EMP_ID = ED.EMP_ID JOIN Location LO ON LO.LOC_ID = SA.LOC_ID""") resultsDF.show(10)