Created 02-19-2016 02:48 PM
Hello I actually have couple of questions regarding phoenix-spark on HBase
I am on HDP 2.3.4, therefore with phoenix 4.4.0.2.3.4.0-3485, and Spark 1.5.2
First question regarding read, I am trying out this very nice example here , but I am getting (following from spark-shell, but also got the same in java):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14, sandbox.hortonworks.com): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast to org.apache.spark.sql.Row at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Which seems to be an issue with the particular spark + phoenix combo on HDP 2.3.4 according to PHOENIX-2287, and it is fixed in phoenix 4.5.3+.
Is there any other way to get round this or have to wait until Hortonworks do an upgrade?
Secondly due to a decision made high up in my organization to not use Scala, I can only use Java and it seems that this example from phoenix (in particular the saveToPhoenix method) :
sc.parallelize(dataSet) .saveToPhoenix( "OUTPUT_TEST_TABLE", Seq("ID","COL1","COL2"), zkUrl = Some("phoenix-server:2181") )
is not available to java according this thread on SO. Is this true?
Anyway I tried with Java by firstly creating this simple table in phoenix:
CREATE TABLE EXAMPLE1 (id BIGINT NOT NULL PRIMARY KEY, COLUMN1 VARCHAR)
And then run the following code java to write the dataframe:
DataFrame writeDF = df.withColumnRenamed("Key", "id")
.withColumnRenamed("somecolumn", "COLUMN1") .selectExpr(new String[]{"id", "COLUMN1"}) // doesnt work even if I renamed with prefix "0." with any of the following: // .withColumnRenamed("COLUMN1", "0.COLUMN1") // .withColumnRenamed("COLUMN1", "`0.COLUMN1`") ; df.write() .format("org.apache.phoenix.spark") .options( ImmutableMap.of("table" , "EXAMPLE1", "zkUrl", "sandbox:2181:/hbase-unsecure")) .mode(SaveMode.Overwrite) .save();
But I am getting these:
org.apache.spark.sql.AnalysisException: cannot resolve '0.COLUMN1' given input columns id, 0.COLUMN1; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
In any case is there a way or example to read / write DataFrame via phoenix for the specific versions of HDP / Phoenix using java?
Thank you in advance!
Created 02-29-2016 10:00 AM
ok at the end I have found a way to both read and write from phoenix into Java spark app.:
// read // using jdbc - which isnt the best way of doing this as there is no push-down optimization... DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc") .options(ImmutableMap.of( "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url", "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure", "dbtable", tableName)).load(); // write // there is no column family specify - it uses whatever that has been linked up in the phoenix table dfICreated.write().format("org.apache.phoenix.spark") .mode(SaveMode.Overwrite) .options(ImmutableMap.of( "zkUrl", "sandbox:2181:/hbase-unsecure", "table", tableName)).save();
These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...
Created 02-20-2016 10:10 PM
Have you seen this? https://phoenix.apache.org/phoenix_spark.html there's a pyspark example but alas no java.
Created 02-29-2016 10:00 AM
ok at the end I have found a way to both read and write from phoenix into Java spark app.:
// read // using jdbc - which isnt the best way of doing this as there is no push-down optimization... DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc") .options(ImmutableMap.of( "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url", "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure", "dbtable", tableName)).load(); // write // there is no column family specify - it uses whatever that has been linked up in the phoenix table dfICreated.write().format("org.apache.phoenix.spark") .mode(SaveMode.Overwrite) .options(ImmutableMap.of( "zkUrl", "sandbox:2181:/hbase-unsecure", "table", tableName)).save();
These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...