Member since
03-20-2016
56
Posts
18
Kudos Received
0
Solutions
06-04-2016
02:23 AM
Hi, Im studing about the catalyst optimizer but Im with some doubts in some of its phases. In the first phase what Im understanding is that, it is created a first logical plan but its not definitive because there are unresolved attributes. And to resolve this attributes Spark SQL uses a catalog, that connects to the tables to check the name and data type of the attribute. My doubt in this first phase is about the meaning of unresolved attributes, and how this catalog works. For example if we have the tables stored in hive, the catalog connects to the hive metastore to check the name and datatype of the attribute? If it is this its not a bit expensive do this in every query we execute? In the second phase are applied rules to the logical plan created before. Some of that rule are predicate pushdown and projection pruning. What Im understanding about predicate pushdown, is that when we submit a query in the spark over hive tables that are on a different machine, we can have a lot of data across the network, and this is not good. But Im not understanding how catalyst works to fix this issue, do you know?
... View more
Labels:
- Labels:
-
Apache Spark
05-30-2016
12:11 PM
Thank for your help really! Now I get it!
... View more
05-30-2016
01:49 AM
Hi thanks for your answer, it really helped, but Im with just some little doubs if you can help it was fine. For example for what Im understanding with your answer the steps are: 1- Hivescan to read data stored in hdfs and create a RDD based on this data (create 1 or more rdd?) 2 - Filter based on the shipdate 3 - Projects 4 - The first TungestenAggregate is to aggregate the data in each rdd based on the agregation keys (returnflag,linestatus) (in each RDD, so we have more than 1?) 5 - The TungstenExchange will distribute the data based on the agregation keys to a new set of RDDs so that each values with the same agregation key will end up in the same RDD? (so we really have more than 1 rdd lol...?) 6 - Ant the last TungstenAggregate that aggregates the pre aggregation I didnt understand very well, can you explain better? So my doubts about your answer its just about the last TungstenAggregate, about the number of RDDS created by spark and also you say "filter locally" "read partitions locally", what you mean by locally? Thanks again really!
... View more
05-29-2016
01:35 AM
1 Kudo
Hi, Im trying to understand physical plans on spark but Im not understanding some parts because they seem different from traditional rdbms. For example in this plan below, its a plan about a query over a hive table. The query is this: select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
The physical plan: == Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None For what Im understanding in the plan is: 1- Frist starts with a Hive table scan 2- Then it filter using where the condition 3- Then project to get the columns we want 4- Then TungstenAggregate, what is this? 5- Then TungstenExchange, what is this? 6- Then TungstenAggregate again what is this? 7- Then ConvertToSafe what is this? 8- Then sorts the final resut But Im not understanding the 4,5,6 and 7 steps. Do you know what they are? Im looking for information about this so I can understand the plan but Im not finding nothing in concrete.
... View more
Labels:
- Labels:
-
Apache Spark
05-26-2016
01:11 AM
Hi, thanks for your answer. But Im not understanding. I think the answer that I accpted fixed the issue. Because starting the spark-shell with spark-shell --master spark://masterhost:7077 in the 8080 port I get: Cores in use: 4 Total, 4 Used Memory in use: 4.0 GB Total, 2.0 GB Used Applications: 1 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE So it seems that it is already working starting spark-shell with thay way, right? But you are suggesting that should be spark-shell --master "local" spark:///mastehost:7077?
... View more
05-25-2016
11:23 AM
I just see your comment now, but I think its working fine now, it seems that I was setting more memory than the memory available.
... View more
05-25-2016
11:22 AM
I decrease the memory in spark-env.sh and now it seems that its working, thanks!
... View more
05-25-2016
11:18 AM
This warn appears when the query starts to execute in stage 0 and then appears the error.
... View more
05-25-2016
11:14 AM
Thanks, but now Im getting this error when I try to execute a query: "16/05/25 12:15:15 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@5547fcb1)". And this Warn: 16/05/25 12:15:05 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resourcesDo you know why?
... View more
05-25-2016
10:34 AM
Hi, Im executing the job on shell. To start shell I use the command "spark-shell". So I need to use spark-shell --master?
... View more