Support Questions

Find answers, ask questions, and share your expertise

Phoenix / HBase problem with HDP 2.3.4 and Java

avatar
Super Collaborator

Hello I actually have couple of questions regarding phoenix-spark on HBase

I am on HDP 2.3.4, therefore with phoenix 4.4.0.2.3.4.0-3485, and Spark 1.5.2

First question regarding read, I am trying out this very nice example here , but I am getting (following from spark-shell, but also got the same in java):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14, sandbox.hortonworks.com): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast to org.apache.spark.sql.Row
        at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Which seems to be an issue with the particular spark + phoenix combo on HDP 2.3.4 according to PHOENIX-2287, and it is fixed in phoenix 4.5.3+.

Is there any other way to get round this or have to wait until Hortonworks do an upgrade?

Secondly due to a decision made high up in my organization to not use Scala, I can only use Java and it seems that this example from phoenix (in particular the saveToPhoenix method) :

sc.parallelize(dataSet)
  .saveToPhoenix(
    "OUTPUT_TEST_TABLE",
    Seq("ID","COL1","COL2"),
    zkUrl = Some("phoenix-server:2181")
  )

is not available to java according this thread on SO. Is this true?

Anyway I tried with Java by firstly creating this simple table in phoenix:

CREATE TABLE EXAMPLE1 (id BIGINT NOT NULL PRIMARY KEY, COLUMN1 VARCHAR)

And then run the following code java to write the dataframe:

DataFrame writeDF = df.withColumnRenamed("Key", "id")
	.withColumnRenamed("somecolumn", "COLUMN1")
        .selectExpr(new String[]{"id", "COLUMN1"})
// doesnt work even if I renamed with prefix "0." with any of the following:
//        .withColumnRenamed("COLUMN1", "0.COLUMN1")        
//        .withColumnRenamed("COLUMN1", "`0.COLUMN1`")
;

df.write()
        .format("org.apache.phoenix.spark")
        .options( ImmutableMap.of("table" , "EXAMPLE1",
                "zkUrl", "sandbox:2181:/hbase-unsecure"))
        .mode(SaveMode.Overwrite)
        .save();

But I am getting these:

org.apache.spark.sql.AnalysisException: cannot resolve '0.COLUMN1' given input columns id, 0.COLUMN1;


	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)


In any case is there a way or example to read / write DataFrame via phoenix for the specific versions of HDP / Phoenix using java?

Thank you in advance!

1 ACCEPTED SOLUTION

avatar
Super Collaborator

ok at the end I have found a way to both read and write from phoenix into Java spark app.:

// read
// using jdbc - which isnt the best way of doing this as there is no push-down optimization...
DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc")
        .options(ImmutableMap.of(
                "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url",
                "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure",
                "dbtable", tableName)).load();

// write
// there is no column family specify - it uses whatever that has been linked up in the phoenix table
dfICreated.write().format("org.apache.phoenix.spark")
        .mode(SaveMode.Overwrite)
        .options(ImmutableMap.of(
                "zkUrl", "sandbox:2181:/hbase-unsecure",
                "table", tableName)).save();

These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...

View solution in original post

2 REPLIES 2

avatar
Master Mentor

Have you seen this? https://phoenix.apache.org/phoenix_spark.html there's a pyspark example but alas no java.

avatar
Super Collaborator

ok at the end I have found a way to both read and write from phoenix into Java spark app.:

// read
// using jdbc - which isnt the best way of doing this as there is no push-down optimization...
DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc")
        .options(ImmutableMap.of(
                "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url",
                "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure",
                "dbtable", tableName)).load();

// write
// there is no column family specify - it uses whatever that has been linked up in the phoenix table
dfICreated.write().format("org.apache.phoenix.spark")
        .mode(SaveMode.Overwrite)
        .options(ImmutableMap.of(
                "zkUrl", "sandbox:2181:/hbase-unsecure",
                "table", tableName)).save();

These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...