Support Questions

Find answers, ask questions, and share your expertise

How does Spark load data from phoenix?

Explorer

If I execute following code

spark.read.format("org.apache.phoenix.spark") \
            .option("table", "data_table") \
            .option("zkUrl", zkUrl) \
            .load().createOrReplaceTempView("table")
spark.sql("select * from table where date>='2018-07-02 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("c")
spark.sql("select * from c").explain(True)

Then I get following explanation

== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `c`
== Analyzed Logical Plan ==
DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string
Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
+- SubqueryAlias c
   +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
      +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
         +- SubqueryAlias table
            +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Optimized Logical Plan ==
Filter ((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00))
+- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Physical Plan ==
*(1) Filter (((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && isnotnull(DATE#48))
+- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...

In this explanation does

PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)

mean that it is sending request to phoenix table ? If that is the case, then it is understandable since we are selecting all data from table which sends request to phoenix. What confuses me is following code

spark.sql("select * from c where date>='2018-07-03 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("d")
spark.sql("select * from d").explain(True)

which gives me following explanation

== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `d`
== Analyzed Logical Plan ==
DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string
Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
+- SubqueryAlias d
   +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
      +- Filter ((cast(date#48 as string) >= 2018-07-03 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
         +- SubqueryAlias c
            +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
               +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
                  +- SubqueryAlias table
                     +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Optimized Logical Plan ==
Filter (((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00)) && (cast(date#48 as string) >= 2018-07-03 00:00:00))
+- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Physical Plan ==
*(1) Filter ((((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && (cast(DATE#48 as string) >= 2018-07-03 00:00:00)) && isnotnull(DATE#48))
+- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...

Here again we can see

PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) 

Is it again trying to connect to phoenix table? Did it not load data the first time. What I thought was it will load the data and store the data in dataframe which if I am not mistaken is a memory table, and if I want to fetch data from another dataframe then why is spark trying to connect to phoenix. I have a problem which requires some calculation over a dataframe and I need to do it for 1k loops, if I load the data from a CSV file , the whole operation takes about 2 secs but when I try to fetch data from phoenix table it take lots of time. I did explain on that loop too , and each time I get the same phoenix relation as above. I must be doing something wrong here, but I am unable to figure it out. Is this the expected behavior of spark_phoenix ?

1 REPLY 1

@Saurab Dahal

I think the problem comes due your pushdown filters are missing the filter for the date column

PushedFilters: [IsNotNull(DATE)]

perhaps you could try to cast the string to date and see if that helps:

spark.sql("select * from c where date>=cast('2018-07-03 00:00:00' as date) and date<cast('2018-07-04 00:00:00' as date)").createTempView("d")spark.sql("select * from d").explain(True)

Please try and let me know

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.