Support Questions

Find answers, ask questions, and share your expertise

Running phoenix flashback queries / setting currentSCN properties for spark-shell

avatar
Super Collaborator

How to specify / run flashback / snapshot query in spark-shell given one has already setup phoenix-spark?

Assuming it has already been setup with the correct jars to run this:

import org.apache.phoenix.spark._
val df_new = sc.parallelize(Seq((345, "some org"), (456, "ORG_ID_1"))).toDF("ID", "ORG_ID")
df_new.saveToPhoenix("MY_TABLE") // I presume it will be some param. within phoenixTableAsDataFrame?
sqlContext.phoenixTableAsDataFrame("MY_TABLE", Array("ID", "ORG_ID")).show
+-------+---------+
|     ID|   ORG_ID|
+-------+---------+
|    456| ORD_ID_1|
|    345| some org|
+-------+---------+

Is it possible?

Thank you!

1 ACCEPTED SOLUTION

avatar
Super Collaborator

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show

View solution in original post

2 REPLIES 2

avatar
Super Collaborator

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show

avatar
Expert Contributor

This feature behaves unexpectedly when the table is migrated from another HBase cluster. In this case, the table creation time can be much later than the row timestamps of all its data. A flashback query meant to select an earlier subset of data will return the following failure instead:

scala> df.count
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: UseSelectColumns=true, selectColumnList.size()=3, selectColumnList=TIMESTR,DBID,OPTION 
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: Select Statement: SELECT "TIMESTR","DBID","OPTION" FROM NS.USAGES
2017-08-11 20:12:40,558 ERROR [main] mapreduce.PhoenixInputFormat: Failed to get the query plan with error [ERROR 1012 (42M03): Table undefined. tableName=NS.USAGES]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#13L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#16L])
      +- Project
         +- Scan ExistingRDD[TIMESTR#10,DBID#11,OPTION#12] 

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
...

Which apparently means that Phoenix considers the table nonexistent at this point. I tested the same approach in sqlline and sure enough, the table is missing from "!tables"

Any workaround?