Support Questions

Find answers, ask questions, and share your expertise

Running phoenix flashback queries / setting currentSCN properties for spark-shell

Expert Contributor

How to specify / run flashback / snapshot query in spark-shell given one has already setup phoenix-spark?

Assuming it has already been setup with the correct jars to run this:

import org.apache.phoenix.spark._
val df_new = sc.parallelize(Seq((345, "some org"), (456, "ORG_ID_1"))).toDF("ID", "ORG_ID")
df_new.saveToPhoenix("MY_TABLE") // I presume it will be some param. within phoenixTableAsDataFrame?
sqlContext.phoenixTableAsDataFrame("MY_TABLE", Array("ID", "ORG_ID")).show
+-------+---------+
|     ID|   ORG_ID|
+-------+---------+
|    456| ORD_ID_1|
|    345| some org|
+-------+---------+

Is it possible?

Thank you!

1 ACCEPTED SOLUTION

Expert Contributor

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show

View solution in original post

2 REPLIES 2

Expert Contributor

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show

Contributor

This feature behaves unexpectedly when the table is migrated from another HBase cluster. In this case, the table creation time can be much later than the row timestamps of all its data. A flashback query meant to select an earlier subset of data will return the following failure instead:

scala> df.count
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: UseSelectColumns=true, selectColumnList.size()=3, selectColumnList=TIMESTR,DBID,OPTION 
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: Select Statement: SELECT "TIMESTR","DBID","OPTION" FROM NS.USAGES
2017-08-11 20:12:40,558 ERROR [main] mapreduce.PhoenixInputFormat: Failed to get the query plan with error [ERROR 1012 (42M03): Table undefined. tableName=NS.USAGES]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#13L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#16L])
      +- Project
         +- Scan ExistingRDD[TIMESTR#10,DBID#11,OPTION#12] 

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
...

Which apparently means that Phoenix considers the table nonexistent at this point. I tested the same approach in sqlline and sure enough, the table is missing from "!tables"

Any workaround?

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.