Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Catalyst optimization phases

avatar
Rising Star

Hi,

Im studing about the catalyst optimizer but Im with some doubts in some of its phases.

In the first phase what Im understanding is that, it is created a first logical plan but its not definitive because there are unresolved attributes. And to resolve this attributes Spark SQL uses a catalog, that connects to the tables to check the name and data type of the attribute.

My doubt in this first phase is about the meaning of unresolved attributes, and how this catalog works. For example if we have the tables stored in hive, the catalog connects to the hive metastore to check the name and datatype of the attribute? If it is this its not a bit expensive do this in every query we execute?

In the second phase are applied rules to the logical plan created before. Some of that rule are predicate pushdown and projection pruning. What Im understanding about predicate pushdown, is that when we submit a query in the spark over hive tables that are on a different machine, we can have a lot of data across the network, and this is not good. But Im not understanding how catalyst works to fix this issue, do you know?

1 ACCEPTED SOLUTION

avatar

Hi John,

I would recommend reading the paper "Spark SQL: Relational Data Processing in Spark" which describes the steps you are mentioning in more detail about the Catalyst Optimizer.

https://web.eecs.umich.edu/~prabal/teaching/resources/eecs582/armbrust15sparksql.pdf

View solution in original post

2 REPLIES 2

avatar
Super Collaborator

The attributes are stored in the Hive metastore and there is a query to it to get table/file information. Pinging the hive metastore isn't really an expensive operation. In fact, when you write an application, when you create a Dataframe from a hive table, as soon as you create it, it pings the metastore, not when an action is run on a dataframe. We only need to ping the metastore the first time we create the dataframes, not for every subsequent query on the dataframe.

The second part of your question regarding predicate pushdown and partition pruning. What that means is that data can be partitioned in Hive by a field. So lets say we have a file full of information about customers and we want to create a hive table. Say we create a table and partition that table by state the customer is located. Hive is going to put all customers from a particular in a single directory, so the structure looks like something like this

.../database_name/my_table
.../database_name/cust/STATE=AK
.../database_name/cust/STATE=AL
....
.../database_name/my_table/STATE=WY

Hive will put all customers with State of Alaska in the AK directory. Now that we know how the data is divided up, Spark will leverage that. So if you wrote a query like

df = hc.sql("Select * from cust where state=AK")

Spark will only read from the directory database_name/cust/State=AK. This saves a significant amount of time, as we only have to query a smaller set of the data. Catalyst takes the semantics of the query (IE the select statement), combined with the information in the metastore (the underlying data is partitioned by state), to tell the application to only read data in the database_name/cust/State=AK directory.

avatar

Hi John,

I would recommend reading the paper "Spark SQL: Relational Data Processing in Spark" which describes the steps you are mentioning in more detail about the Catalyst Optimizer.

https://web.eecs.umich.edu/~prabal/teaching/resources/eecs582/armbrust15sparksql.pdf