Created 05-29-2016 01:35 AM
Hi, Im trying to understand physical plans on spark but Im not understanding some parts because they seem different from traditional rdbms. For example in this plan below, its a plan about a query over a hive table. The query is this:
select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate <= '1998-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
The physical plan:
== Physical Plan == Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0 +- ConvertToUnsafe +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None +- ConvertToSafe +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L]) +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L]) +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35] +- Filter (l_shipdate#37 <= 1998-09-16) +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
For what Im understanding in the plan is:
1- Frist starts with a Hive table scan
2- Then it filter using where the condition
3- Then project to get the columns we want
4- Then TungstenAggregate, what is this?
5- Then TungstenExchange, what is this?
6- Then TungstenAggregate again what is this?
7- Then ConvertToSafe what is this?
8- Then sorts the final resut
But Im not understanding the 4,5,6 and 7 steps. Do you know what they are? Im looking for information about this so I can understand the plan but Im not finding nothing in concrete.
Created 05-29-2016 04:31 PM
Its actually not too hard. Tungsten is an engine in Spark to process data on a lower level using vectorized and in other ways optimized operations. TungstenAggregate for example allows an efficient hash table based aggregation.
So if you want to understand the aggregation in Tungsten:
TungestenAggregate-> TungstenExchange -> TungstenAggregate
it might help if you understand what a
map->combiner->reducer is in MapReduce. Its very similar.
Essentially you do a group by l_returnflag, l_linestatus and then aggregate a bunch of columns.
So what Spark needs to do is:
- Read local partitions of data on each block ( RDD ) HiveScan
- Filter locally and project ( Filter, Project )
- Locally aggregate the data on each RDD by your grouping keys ( First TungstenAggregate ( you can see the mode=Partial tag ) in mapreduce this would be the combiner.
- Distribute the data by group key to a new set of RDDs so all values for a specific group key will end up in the same target RDD, in MapReduce this would be called the shuffle : TungstenExchange
- Do the final aggregation of the pre aggregated values on the target rdds, : second TungstenAggregate with mode=Final
- Do some casting and type conversion ( ConvertToSave )
Voila hope that helps
Created 05-29-2016 04:31 PM
Its actually not too hard. Tungsten is an engine in Spark to process data on a lower level using vectorized and in other ways optimized operations. TungstenAggregate for example allows an efficient hash table based aggregation.
So if you want to understand the aggregation in Tungsten:
TungestenAggregate-> TungstenExchange -> TungstenAggregate
it might help if you understand what a
map->combiner->reducer is in MapReduce. Its very similar.
Essentially you do a group by l_returnflag, l_linestatus and then aggregate a bunch of columns.
So what Spark needs to do is:
- Read local partitions of data on each block ( RDD ) HiveScan
- Filter locally and project ( Filter, Project )
- Locally aggregate the data on each RDD by your grouping keys ( First TungstenAggregate ( you can see the mode=Partial tag ) in mapreduce this would be the combiner.
- Distribute the data by group key to a new set of RDDs so all values for a specific group key will end up in the same target RDD, in MapReduce this would be called the shuffle : TungstenExchange
- Do the final aggregation of the pre aggregated values on the target rdds, : second TungstenAggregate with mode=Final
- Do some casting and type conversion ( ConvertToSave )
Voila hope that helps
Created 05-30-2016 01:49 AM
Hi thanks for your answer, it really helped, but Im with just some little doubs if you can help it was fine. For example for what Im understanding with your answer the steps are: 1- Hivescan to read data stored in hdfs and create a RDD based on this data (create 1 or more rdd?) 2 - Filter based on the shipdate 3 - Projects 4 - The first TungestenAggregate is to aggregate the data in each rdd based on the agregation keys (returnflag,linestatus) (in each RDD, so we have more than 1?) 5 - The TungstenExchange will distribute the data based on the agregation keys to a new set of RDDs so that each values with the same agregation key will end up in the same RDD? (so we really have more than 1 rdd lol...?) 6 - Ant the last TungstenAggregate that aggregates the pre aggregation I didnt understand very well, can you explain better?
So my doubts about your answer its just about the last TungstenAggregate, about the number of RDDS created by spark and also you say "filter locally" "read partitions locally", what you mean by locally?
Thanks again really!
Created 05-30-2016 10:10 AM
"(so we really have more than 1 rdd lol...?)"
I was not precise here. We have one RDD ( distributed dataset ) with x partitions. So when I said RDD replace it with RDD partition.
"1- Hivescan to read data stored in hdfs and create a RDD based on this data (create 1 or more rdd?)"
Spark uses the underlying hadoop inputfprmats to read the files and creates a Spark RDD partition for each split ( normally a block ) of the file in hdfs. It will then try to place these RDD partitions in executors which are in the same physical machine as the file block. This is what I mean with locally. Whenever you can read data on the local machine without having to send it to other machines.
"6 - Ant the last TungstenAggregate that aggregates the pre aggregation I didnt understand very well, can you explain better?
http://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm
Lots of great explanations around. It works exactly the same way as a combiner. Instead of shipping all values around the network you do a local aggregation on each node and then only distribute the intermediate result around to do the final aggregation after the shuffle/exchange.
Created 05-30-2016 12:11 PM
Thank for your help really! Now I get it!