Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Phoenix Dynamic Columns in Spark Dataframe API

Highlighted

Phoenix Dynamic Columns in Spark Dataframe API

Expert Contributor

I'm trying to use Phoenix to fill a HBase table with dynamic content.

Therefore I did some experiments with dynamic columns (https://phoenix.apache.org/dynamic_columns.html) in the sqlline tool.

E.g. I run the following queries successively:

CREATE TABLE t1 (a VARCHAR, b INTEGER PRIMARY KEY);

UPSERT INTO t1 VALUES ('test1', 1);

// Insert new row with a third column 'c'
UPSERT INTO t1 (a, b, c INTEGER) VALUES('test2', 2, 3);

SELECT * FROM t1;
+--------+----+
|   A    | B  |
+--------+----+
| test1  | 1  |
| test2  | 2  |
+--------+----+

// Add the new column
ALTER TABLE t1 ADD IF NOT EXISTS c INTEGER;

SELECT * FROM t1;  
+--------+----+-------+
|   A    | B  |   C   |
+--------+----+-------+
| test1  | 1  | null  |
| test2  | 2  | 3     |
+--------+----+-------+

This is a good way to add different data to an existing table, e.g. if new columns were needed (e.g. because of a new data schema).

This works fine for my sqlline tool, but now I wanted to use the Phoenix API in my Spark application to save different DataFrames to my HBase table.

Saving a DataFrame object that contains the same columns as the table itself, everything works fine. But when I try to add another DataFrame with some new columns in, I get an exeption.

Here my commands:

val readDF = sqlContext.read.format("org.apache.phoenix.spark").option("table", "T1").option("zkUrl", "jdbc:phoenix:zim1ext-vm.et-it.hs-offenburg.de:2181:/hbase-unsecure").load()
readDF.printSchema

readDF: org.apache.spark.sql.DataFrame = [A: string, B: int, C: int]
root
 |-- A: string (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)

val newDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("/public/test4.csv")
newDF.printSchema

newDF: org.apache.spark.sql.DataFrame = [B: int, C: int, D: string]
root
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- D: string (nullable = true)      // This is a new column!!!

newDF.write.format("org.apache.phoenix.spark")
  .mode("overwrite")
  .option("table", "T1")
  .option("zkUrl", "jdbc:phoenix:server-url:2181:/hbase-unsecure")
  .save()

I thought, that Spark is able to do the UPSERT with the new column by the DataFrame object, but I get this exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 31, server-url): java.sql.SQLException: Unable to resolve these column names:
D
Available columns with column families:
0.A,B,0.C
    at org.apache.phoenix.util.PhoenixRuntime.generateColumnInfo(PhoenixRuntime.java:475)
    at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:252)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:48)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:44)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1144)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:994)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:985)
    at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:58)
    at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:27)
    at org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:47)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC.<init>(<console>:47)
    at $iwC.<init>(<console>:49)
    at <init>(<console>:51)
    at .<init>(<console>:55)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:991)
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:1197)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:1164)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:1157)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:101)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:502)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Unable to resolve these column names:
D
Available columns with column families:
0.A,B,0.C
    at org.apache.phoenix.util.PhoenixRuntime.generateColumnInfo(PhoenixRuntime.java:475)
    at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:252)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:48)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:44)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more

My question now is: Is there a possibility to add new columns to a HBase table using DataFrames in Spark's Phoenix API? Maybe something like a .option("dynamic-columns", "true") or similar?

I'm working on HDP 2.6 (Spark 1.6.3, HBase 1.1 and Phoenix 4.7). Thanks for your help!

2 REPLIES 2

Re: Phoenix Dynamic Columns in Spark Dataframe API

New Contributor

Was you able to fix this problem ? I'm facing the same problem using phoenix, spark and hbase. I'm trying add dynamic column using the spark. Is it possible ?

Re: Phoenix Dynamic Columns in Spark Dataframe API

Cloudera Employee

@Daniel Müller, I am facing similar issue. Did you find any solutions or workarounds?