Member since
09-29-2017
2
Posts
0
Kudos Received
0
Solutions
10-02-2017
02:48 PM
Thanks for your response Raju! I am very new to Spark and trying a POC on Data Warehouse cloud work. Yes the files are huge. I am on AWS and using EMR for the Spark work. the S3 Bucket has couple of 16 GB Files in ORC Format. I am going to split them into multiple files and use the Partitioning and Bucketing option for faster I/O. Can you explain your #2 and #4. If possible, can you rewrite my code so that i can understand this better on what you meant by get Shuffle per Emp_Id, Loc_Id, EmpDet_ID. Also, using SparkSQL and DataFrames, how will i join two tables and then have the 3rd one and then the 4th one. Can you share an example of show that in my code. Appreciate your response and help. Thanks!
... View more
10-01-2017
08:33 PM
I am unable to execute this Job and have been trying for a few days now.
One of the Tables has 2 Billion Rows, 2nd one has 220 Mil Rows, 3rd one has 1.4 Mil Rows, and the 4th one has about 150K rows. Trying to store the data in Dataframes and cache the large tables and do the ETL via Spark SQL 2.2 and then load the file processed data to Redshift. I haven't figured out the Redshift piece yet but this code never returned any results to me. How should this be rewritten and what should i optimize to make the ETL process work and load this data to Redshift.
This process never completed so far and took for about an hour or more to run and crashed always.
spark-scala-employee.txt
Can you please help? import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val spark = SparkSession
.builder()
.appName("SparkSQL")
.config("spark.sql.DataWarehouse.dir", "s3://phy.datawarehouse/data/orcdb/")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
//Read File From S3 and register it as Temporary View as Employee
val EmpDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Employee")
EmpDF.createOrReplaceTempView("Employee")
//Read File From S3 and register it as Temporary View as Employee Details
val EmpDetDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/EmployeeDetails")
EmpDetDF.createOrReplaceTempView("EmployeeDetails")
//Read File From S3 and register it as Temporary View as Sales
val SalesDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Sales")
SalesDF.createOrReplaceTempView("Sales")
//Read File From S3 and register it as Temporary View as Location
val LocationDF = spark.read.orc("s3://phy.datawarehouse/data/orcdb/Location")
LocationDF.createOrReplaceTempView("Location")
//Show the tables's first 20 rows
EmpDF.show(10)
EmpDetDF.show(10)
SalesDF.show(10)
LocationDF.show(10)
//Display the the total number of rows in the dataframe
println("Total number of Emp: " + EmpDF.count())
println("Total number of Emp Details: " + EmpDetDF.count())
println("Total number of Sales: " + SalesDF.count())
println("Total number of Location: " + LocationDF.count())
//Filter Datasets Employee
val resultsEmp = spark.sql("SELECT * FROM Employee WHERE NAME_TYPE = 'USA' AND SUB_TYPE = 'NA'")
resultsEmp.show(10, false)
//Filter Datasets Trait
val resultsLoc = spark.sql("SELECT * FROM Location WHERE LOC_TYPE = 'USA' ")
resultsLoc.show(10, false)
// Creates a temporary view using the DataFrame
resultsEmp.createOrReplaceTempView("Employee")
resultsLoc.createOrReplaceTempView("Location")
//accessing catalog metadata using spark.catalog
spark.catalog.listDatabases.show(false)
//show database tablenames; it should show our table name
spark.catalog.listTables.show(false)
//Cache Sales DataFrame (2.2 Billion Rows)
SalesDF.cache()
//Cache Employee Details Data Frame (220 Million Rows)
EmpDetDF.cache()
val resultsDF = spark.sql("""SELECT DISTINCT EM.EMP_ID, EMPDET.EMPDETAILS, LOC.LOC_ID, SA.SALES_VALUE
FROM Sales SA
JOIN EmployeeDetails ED ON SA.EMPDET_ID = ED.EMPDET_ID
JOIN Employee EM ON EM.EMP_ID = ED.EMP_ID
JOIN Location LO ON LO.LOC_ID = SA.LOC_ID""")
resultsDF.show(10)
... View more
Labels:
- Labels:
-
Apache Spark