Created 07-16-2018 10:40 AM
If I execute following code
spark.read.format("org.apache.phoenix.spark") \ .option("table", "data_table") \ .option("zkUrl", zkUrl) \ .load().createOrReplaceTempView("table") spark.sql("select * from table where date>='2018-07-02 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("c") spark.sql("select * from c").explain(True)
Then I get following explanation
== Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation `c` == Analyzed Logical Plan == DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55] +- SubqueryAlias c +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55] +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00)) +- SubqueryAlias table +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) == Optimized Logical Plan == Filter ((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00)) +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) == Physical Plan == *(1) Filter (((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && isnotnull(DATE#48)) +- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...
In this explanation does
PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
mean that it is sending request to phoenix table ? If that is the case, then it is understandable since we are selecting all data from table which sends request to phoenix. What confuses me is following code
spark.sql("select * from c where date>='2018-07-03 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("d") spark.sql("select * from d").explain(True)
which gives me following explanation
== Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation `d` == Analyzed Logical Plan == DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55] +- SubqueryAlias d +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55] +- Filter ((cast(date#48 as string) >= 2018-07-03 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00)) +- SubqueryAlias c +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55] +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00)) +- SubqueryAlias table +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) == Optimized Logical Plan == Filter (((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00)) && (cast(date#48 as string) >= 2018-07-03 00:00:00)) +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) == Physical Plan == *(1) Filter ((((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && (cast(DATE#48 as string) >= 2018-07-03 00:00:00)) && isnotnull(DATE#48)) +- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...
Here again we can see
PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
Is it again trying to connect to phoenix table? Did it not load data the first time. What I thought was it will load the data and store the data in dataframe which if I am not mistaken is a memory table, and if I want to fetch data from another dataframe then why is spark trying to connect to phoenix. I have a problem which requires some calculation over a dataframe and I need to do it for 1k loops, if I load the data from a CSV file , the whole operation takes about 2 secs but when I try to fetch data from phoenix table it take lots of time. I did explain on that loop too , and each time I get the same phoenix relation as above. I must be doing something wrong here, but I am unable to figure it out. Is this the expected behavior of spark_phoenix ?
Created 07-16-2018 12:03 PM
I think the problem comes due your pushdown filters are missing the filter for the date column
PushedFilters: [IsNotNull(DATE)]
perhaps you could try to cast the string to date and see if that helps:
spark.sql("select * from c where date>=cast('2018-07-03 00:00:00' as date) and date<cast('2018-07-04 00:00:00' as date)").createTempView("d")spark.sql("select * from d").explain(True)
Please try and let me know
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.