Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Running phoenix flashback queries / setting currentSCN properties for spark-shell

Solved Go to solution

Running phoenix flashback queries / setting currentSCN properties for spark-shell

Expert Contributor

How to specify / run flashback / snapshot query in spark-shell given one has already setup phoenix-spark?

Assuming it has already been setup with the correct jars to run this:

import org.apache.phoenix.spark._
val df_new = sc.parallelize(Seq((345, "some org"), (456, "ORG_ID_1"))).toDF("ID", "ORG_ID")
df_new.saveToPhoenix("MY_TABLE") // I presume it will be some param. within phoenixTableAsDataFrame?
sqlContext.phoenixTableAsDataFrame("MY_TABLE", Array("ID", "ORG_ID")).show
+-------+---------+
|     ID|   ORG_ID|
+-------+---------+
|    456| ORD_ID_1|
|    345| some org|
+-------+---------+

Is it possible?

Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Running phoenix flashback queries / setting currentSCN properties for spark-shell

Expert Contributor

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show
2 REPLIES 2
Highlighted

Re: Running phoenix flashback queries / setting currentSCN properties for spark-shell

Expert Contributor

After looking at the source and this jira I have finally got round to do it in spark-shell:

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
val conf = new Configuration
conf.setLong(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, 1490706922000)
sqlContext.phoenixTableAsDataFrame("TRADE.TRADE", Array("ID", "BROKER"), conf = conf).show

Re: Running phoenix flashback queries / setting currentSCN properties for spark-shell

Contributor

This feature behaves unexpectedly when the table is migrated from another HBase cluster. In this case, the table creation time can be much later than the row timestamps of all its data. A flashback query meant to select an earlier subset of data will return the following failure instead:

scala> df.count
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: UseSelectColumns=true, selectColumnList.size()=3, selectColumnList=TIMESTR,DBID,OPTION 
2017-08-11 20:12:40,550 INFO  [main] mapreduce.PhoenixInputFormat: Select Statement: SELECT "TIMESTR","DBID","OPTION" FROM NS.USAGES
2017-08-11 20:12:40,558 ERROR [main] mapreduce.PhoenixInputFormat: Failed to get the query plan with error [ERROR 1012 (42M03): Table undefined. tableName=NS.USAGES]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#13L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#16L])
      +- Project
         +- Scan ExistingRDD[TIMESTR#10,DBID#11,OPTION#12] 

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
...

Which apparently means that Phoenix considers the table nonexistent at this point. I tested the same approach in sqlline and sure enough, the table is missing from "!tables"

Any workaround?

Don't have an account?
Coming from Hortonworks? Activate your account here