Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Phoenix / HBase problem with HDP 2.3.4 and Java

avatar
Super Collaborator

Hello I actually have couple of questions regarding phoenix-spark on HBase

I am on HDP 2.3.4, therefore with phoenix 4.4.0.2.3.4.0-3485, and Spark 1.5.2

First question regarding read, I am trying out this very nice example here , but I am getting (following from spark-shell, but also got the same in java):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14, sandbox.hortonworks.com): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast to org.apache.spark.sql.Row
        at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Which seems to be an issue with the particular spark + phoenix combo on HDP 2.3.4 according to PHOENIX-2287, and it is fixed in phoenix 4.5.3+.

Is there any other way to get round this or have to wait until Hortonworks do an upgrade?

Secondly due to a decision made high up in my organization to not use Scala, I can only use Java and it seems that this example from phoenix (in particular the saveToPhoenix method) :

sc.parallelize(dataSet)
  .saveToPhoenix(
    "OUTPUT_TEST_TABLE",
    Seq("ID","COL1","COL2"),
    zkUrl = Some("phoenix-server:2181")
  )

is not available to java according this thread on SO. Is this true?

Anyway I tried with Java by firstly creating this simple table in phoenix:

CREATE TABLE EXAMPLE1 (id BIGINT NOT NULL PRIMARY KEY, COLUMN1 VARCHAR)

And then run the following code java to write the dataframe:

DataFrame writeDF = df.withColumnRenamed("Key", "id")
	.withColumnRenamed("somecolumn", "COLUMN1")
        .selectExpr(new String[]{"id", "COLUMN1"})
// doesnt work even if I renamed with prefix "0." with any of the following:
//        .withColumnRenamed("COLUMN1", "0.COLUMN1")        
//        .withColumnRenamed("COLUMN1", "`0.COLUMN1`")
;

df.write()
        .format("org.apache.phoenix.spark")
        .options( ImmutableMap.of("table" , "EXAMPLE1",
                "zkUrl", "sandbox:2181:/hbase-unsecure"))
        .mode(SaveMode.Overwrite)
        .save();

But I am getting these:

org.apache.spark.sql.AnalysisException: cannot resolve '0.COLUMN1' given input columns id, 0.COLUMN1;


	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)


In any case is there a way or example to read / write DataFrame via phoenix for the specific versions of HDP / Phoenix using java?

Thank you in advance!

1 ACCEPTED SOLUTION

avatar
Super Collaborator

ok at the end I have found a way to both read and write from phoenix into Java spark app.:

// read
// using jdbc - which isnt the best way of doing this as there is no push-down optimization...
DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc")
        .options(ImmutableMap.of(
                "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url",
                "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure",
                "dbtable", tableName)).load();

// write
// there is no column family specify - it uses whatever that has been linked up in the phoenix table
dfICreated.write().format("org.apache.phoenix.spark")
        .mode(SaveMode.Overwrite)
        .options(ImmutableMap.of(
                "zkUrl", "sandbox:2181:/hbase-unsecure",
                "table", tableName)).save();

These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...

View solution in original post

2 REPLIES 2

avatar
Master Mentor

Have you seen this? https://phoenix.apache.org/phoenix_spark.html there's a pyspark example but alas no java.

avatar
Super Collaborator

ok at the end I have found a way to both read and write from phoenix into Java spark app.:

// read
// using jdbc - which isnt the best way of doing this as there is no push-down optimization...
DataFrame dfFromHbase = SPARK_MANAGED_RESOURCE.getSparkSqlContext().read().format("jdbc")
        .options(ImmutableMap.of(
                "driver" , "org.apache.phoenix.jdbc.PhoenixDriver", "url",
                "jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure",
                "dbtable", tableName)).load();

// write
// there is no column family specify - it uses whatever that has been linked up in the phoenix table
dfICreated.write().format("org.apache.phoenix.spark")
        .mode(SaveMode.Overwrite)
        .options(ImmutableMap.of(
                "zkUrl", "sandbox:2181:/hbase-unsecure",
                "table", tableName)).save();

These are for sandbox 2.3.4. I hope hortonworks will upgrade to latest phoenix (4.6 or 4.7?) soon as the read would provide push down query, which I dont think the jdbc driver is doing at the moment...