Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

How does Spark load data from phoenix?

Explorer

If I execute following code

spark.read.format("org.apache.phoenix.spark") \
            .option("table", "data_table") \
            .option("zkUrl", zkUrl) \
            .load().createOrReplaceTempView("table")
spark.sql("select * from table where date>='2018-07-02 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("c")
spark.sql("select * from c").explain(True)

Then I get following explanation

== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `c`
== Analyzed Logical Plan ==
DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string
Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
+- SubqueryAlias c
   +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
      +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
         +- SubqueryAlias table
            +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Optimized Logical Plan ==
Filter ((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00))
+- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Physical Plan ==
*(1) Filter (((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && isnotnull(DATE#48))
+- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...

In this explanation does

PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)

mean that it is sending request to phoenix table ? If that is the case, then it is understandable since we are selecting all data from table which sends request to phoenix. What confuses me is following code

spark.sql("select * from c where date>='2018-07-03 00:00:00' and date<'2018-07-04 00:00:00'").createTempView("d")
spark.sql("select * from d").explain(True)

which gives me following explanation

== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `d`
== Analyzed Logical Plan ==
DATE: timestamp, ID: string, SESSIONID: string, IP: string, NAME: string, BYTES_SENT: int, DELTA: int, W_ID: string
Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
+- SubqueryAlias d
   +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
      +- Filter ((cast(date#48 as string) >= 2018-07-03 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
         +- SubqueryAlias c
            +- Project [DATE#48, ID#49, SESSIONID#50, IP#51, NAME#52, BYTES_SENT#53, DELTA#54, W_ID#55]
               +- Filter ((cast(date#48 as string) >= 2018-07-02 00:00:00) && (cast(date#48 as string) < 2018-07-04 00:00:00))
                  +- SubqueryAlias table
                     +- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Optimized Logical Plan ==
Filter (((isnotnull(date#48) && (cast(date#48 as string) >= 2018-07-02 00:00:00)) && (cast(date#48 as string) < 2018-07-04 00:00:00)) && (cast(date#48 as string) >= 2018-07-03 00:00:00))
+- Relation[DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false)
== Physical Plan ==
*(1) Filter ((((cast(DATE#48 as string) >= 2018-07-02 00:00:00) && (cast(DATE#48 as string) < 2018-07-04 00:00:00)) && (cast(DATE#48 as string) >= 2018-07-03 00:00:00)) && isnotnull(DATE#48))
+- *(1) Scan PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) [DATE#48,ID#49,SESSIONID#50,IP#51,NAME#52,BYTES_SENT#53,DELTA#54,W_ID#55] PushedFilters: [IsNotNull(DATE)], ReadSchema: struct<DATE:timestamp,ID:string,SESSIONID:string,IP:string,NAME:string,BYTES_SENT:int,DELTA:int,W...

Here again we can see

PhoenixRelation(data_table,10.10.5.20,10.10.5.21,10.10.5.22,10.10.5.23:2181,false) 

Is it again trying to connect to phoenix table? Did it not load data the first time. What I thought was it will load the data and store the data in dataframe which if I am not mistaken is a memory table, and if I want to fetch data from another dataframe then why is spark trying to connect to phoenix. I have a problem which requires some calculation over a dataframe and I need to do it for 1k loops, if I load the data from a CSV file , the whole operation takes about 2 secs but when I try to fetch data from phoenix table it take lots of time. I did explain on that loop too , and each time I get the same phoenix relation as above. I must be doing something wrong here, but I am unable to figure it out. Is this the expected behavior of spark_phoenix ?

1 REPLY 1

@Saurab Dahal

I think the problem comes due your pushdown filters are missing the filter for the date column

PushedFilters: [IsNotNull(DATE)]

perhaps you could try to cast the string to date and see if that helps:

spark.sql("select * from c where date>=cast('2018-07-03 00:00:00' as date) and date<cast('2018-07-04 00:00:00' as date)").createTempView("d")spark.sql("select * from d").explain(True)

Please try and let me know

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.