Created 12-28-2018 12:40 AM
My goal is to create a Cube of 4 Dimensions and 1 Measure. This means I have in total 16 GroupBy`s to compute.
In my code you can see the 4 Dimensions (Gender,Age,TotalChildren,ProductCategoryName) and the Measure TotalCost.
I have filter all my columns to drop any row that it is null. After that I compute every GroupBy one by one and then I use coalesce() to bind the csv`s into one file.
All the process takes about 10 minutes which I think is too much. Is there any way to enhance the process? Maybe by computing some groupby`s from others? Also my data is about 5GB so if I read it 16 times as the number of groupby`s this means in total 80GB.
Thank you very much
Here is my Code
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.udf object ComputeCube { def main(args:Array[String]):Unit= { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName("SparkProject2018") .getOrCreate() import spark.implicits._ val filePath="src/main/resources/dataspark.txt" val df = spark.read.options(Map("inferSchema"->"true","delimiter"->"|","header"->"true")) .csv(filePath).select("Gender", "BirthDate", "TotalCost", "TotalChildren", "ProductCategoryName") val df2 = df .filter("Gender is not null") .filter("BirthDate is not null") .filter("TotalChildren is not null") .filter("ProductCategoryName is not null") val currentDate = udf{ (dob: java.sql.Date) => import java.time.{LocalDate, Period} Period.between(dob.toLocalDate, LocalDate.now).getYears } val df3 = df2.withColumn("Age", currentDate($"BirthDate")) val groupByAll = df3.groupBy("Gender","Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost") val groupByGenderAndAgeAndTotalChildren = df3.groupBy("Gender","Age", "TotalChildren").avg("TotalCost") val groupByGenderAndAgeAndProductCategoryName = df3.groupBy("Gender","Age", "ProductCategoryName" ).avg("TotalCost") val groupByGenderAndTotalChildrenAndProductCategoryName = df3.groupBy("Gender", "TotalChildren", "ProductCategoryName" ).avg("TotalCost") val groupByAgeAndTotalChildrenAndProductCategoryName = df3.groupBy("Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost") val groupByGenderAndAge = df3.groupBy("Gender","Age").avg("TotalCost") val groupByGenderAndTotalChildren = df3.groupBy("Gender","TotalChildren").avg("TotalCost") val groupByGenderAndProductCategoryName = df3.groupBy("Gender","ProductCategoryName" ).avg("TotalCost") val groupByAgeAndTotalChildren = df3.groupBy("Age","TotalChildren").avg("TotalCost") val groupByAgeAndProductCategoryName = df3.groupBy("Age","ProductCategoryName" ).avg("TotalCost") val groupByTotalChildrenAndProductCategoryName = df3.groupBy("TotalChildren","ProductCategoryName" ).avg("TotalCost") val groupByGender = df3.groupBy("Gender").avg("TotalCost") val groupByAge = df3.groupBy("Age").avg("TotalCost") val groupByTotalChildren = df3.groupBy("TotalChildren" ).avg("TotalCost") val groupByProductCategoryName = df3.groupBy("ProductCategoryName" ).avg("TotalCost") val groupByNone = df3.groupBy().avg("TotalCost") groupByAll.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/All.csv") groupByGenderAndAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_Age_TotalChildren.csv") groupByGenderAndAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_Age_ProductCategoryName.csv") groupByGenderAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_TotalChildren_ProductCategoryName.csv") groupByAgeAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Age_TotalChildren_ProductCategoryName.csv") groupByGenderAndAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_Age.csv") groupByGenderAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_TotalChildren.csv") groupByGenderAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender_ProductCategoryName.csv") groupByAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Age_TotalChildren.csv") groupByAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Age_ProductCategoryName.csv") groupByTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/TotalChildren_ProductCategoryName.csv") groupByGender.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Gender.csv") groupByAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/Age.csv") groupByTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/TotalChildren.csv") groupByProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/ProductCategoryName.csv") groupByNone.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true") .mode("overwrite").save("src/main/resources/None.csv") } }
Created 12-28-2018 03:43 AM
As you are using df3 dataframe for all groupBy clauses, so cache the "df3" dataframe then spark will not recompute the data from file for all 16 times instead uses the cached dataframe(df3) and This will significantly increases the performance.
val df3 = df2.withColumn("Age", currentDate($"BirthDate")) df3.cache() //caching the dataframe into memory
Created 12-28-2018 03:43 AM
As you are using df3 dataframe for all groupBy clauses, so cache the "df3" dataframe then spark will not recompute the data from file for all 16 times instead uses the cached dataframe(df3) and This will significantly increases the performance.
val df3 = df2.withColumn("Age", currentDate($"BirthDate")) df3.cache() //caching the dataframe into memory
Created 12-28-2018 09:31 AM
Thank you a lot!It worked like a charm!Shame on me I did not think it.
However, don`t you find it bad practice to compute the groubpy`s separately? I mean shouldn`t I reuse the groupby of the 4 dimensions to compute the groupby`s of the three dimensions?
Thank you
Created 12-28-2018 07:23 PM
In case of caluculating average we are not going to get same value.
Example:
Hive table data:
select * from i; +---------+------+--------+-------+--+ | gender | age | total | name | +---------+------+--------+-------+--+ | M | 10 | 12 | st | | F | 10 | 8 | st | | M | 12 | 15 | st | +---------+------+--------+-------+--+
Caluculate Avg on one field:
select avg(total) from i group by name; +---------------------+--+ | _c0 | +---------------------+--+ | 11.666666666666666 | +---------------------+--+
Calculate avg using subquery:
select avg(tt) from ( select name,avg(total)tt from i group by gender,name)t
group by name; +--------+--+ | _c0 | +--------+--+ | 10.75 | +--------+--+
as we are doing 2 dimensions in innerquery then 1D group by on outer query we can see the difference between averages because number of rows are 2 in outer query instead of 3.
Correct subquery would be:
select sum(tt)/sum(cnt) from(
select name,sum(total)tt,count(*)cnt,avg(total) from i group by gender,name //add cnt column,sum,avg)t
group by name; +---------------------+--+ | _c0 | +---------------------+--+ | 11.666666666666666 | +---------------------+--+
You can try with this approach (adding count column then sum(total) divide sum(count) to get correct avg results) and cache the most appropriate dataframe then spark optimizer will choose the most efficient plan to run these tasks.