Created on 08-01-2025 03:40 AM - edited 08-01-2025 03:44 AM
Hi,
I'm attempting to run a simple join query between two tables in an Apache Phoenix database on a Spark cluster. However, when I try to read from the resulting DataFrame after executing the query, I encounter a NullPointerException.
Query : (SELECT CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID,CONNTEST.EMPLOYEEFILTERJOIN.NAME,CONNTEST.EMPLOYEEFILTERJOIN.SALARY,CONNTEST.DEPARTMENTFILTERJOIN.DEPT_ID,CONNTEST.DEPARTMENTFILTERJOIN.DETP_NAME FROM CONNTEST.EMPLOYEEFILTERJOIN,CONNTEST.DEPARTMENTFILTERJOIN WHERE CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID=CONNTEST.DEPARTMENTFILTERJOIN.EMP_ID) query_alias
Below is the code I'm executing,
val spark = SparkSession.builder()
.appName("PhoenixWriteIssueSimulator")
.master("local[*]")
.config("spark.sql.catalogImplementation", "in-memory")
.getOrCreate()
val jdbcProperties = new Properties()
jdbcProperties.setProperty("driver","org.apache.phoenix.jdbc.PhoenixDriver")
var df = spark.read
.jdbc(phoenixJdbcUrl, phoenixSourceQuery, jdbcProperties)
var df = spark.read
.jdbc(
url = s"${phoenixJdbcUrl}",
table = s"${phoenixSourceQuery}",
properties = jdbcProperties
);
df.show(20);
df.show(20) is failing with the below stacktrace,
25/08/01 15:34:51 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.phoenix.util.SchemaUtil.getSchemaNameFromFullName(SchemaUtil.java:713) at org.apache.phoenix.schema.TableNotFoundException.<init>(TableNotFoundException.java:44) at org.apache.phoenix.compile.FromCompiler$MultiTableColumnResolver.resolveTable(FromCompiler.java:1013) at org.apache.phoenix.compile.JoinCompiler$ColumnRefParseNodeVisitor.visit(JoinCompiler.java:1399) at org.apache.phoenix.compile.JoinCompiler$ColumnRefParseNodeVisitor.visit(JoinCompiler.java:1371) at org.apache.phoenix.parse.ColumnParseNode.accept(ColumnParseNode.java:56) at org.apache.phoenix.util.ParseNodeUtil.applyParseNodeVisitor(ParseNodeUtil.java:67) at org.apache.phoenix.compile.JoinCompiler.compile(JoinCompiler.java:172) at org.apache.phoenix.compile.QueryCompiler.compileSelect(QueryCompiler.java:248) at org.apache.phoenix.compile.QueryCompiler.compile(QueryCompiler.java:178) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:543) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:506) at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:311) at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:300) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:299) at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:292) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeQuery(PhoenixPreparedStatement.java:190) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$14.apply(Executor.scala:459) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
I have also checked the schame by printing df.printSchema() below is the output for the same,
root |-- CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID: integer (nullable = true) |-- CONNTEST.EMPLOYEEFILTERJOIN.NAME: string (nullable = true) |-- CONNTEST.EMPLOYEEFILTERJOIN.SALARY: integer (nullable = true) |-- CONNTEST.DEPARTMENTFILTERJOIN.DEPT_ID: integer (nullable = true) |-- CONNTEST.DEPARTMENTFILTERJOIN.DETP_NAME: string (nullable = true)
Created 08-04-2025 10:15 PM
@Arun_Tejasvi, Welcome to our community! To help you get the best possible answer, I have tagged in our Spark experts @haridjh, who may be able to assist you further.
Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.
Regards,
Vidya Sargur,Created 08-06-2025 03:05 PM
Hi Arun,
CREATE VIEW CONNTEST.JOIN_VIEW AS
SELECT
emp.EMP_ID,
emp.NAME,
emp.SALARY,
dep.DEPT_ID,
dep.DETP_NAME
FROM CONNTEST.EMPLOYEEFILTERJOIN emp
JOIN CONNTEST.DEPARTMENTFILTERJOIN dep
ON emp.EMP_ID = dep.EMP_ID;
val df = spark.read
.jdbc(
url = phoenixJdbcUrl,
table = "CONNTEST.JOIN_VIEW",
properties = jdbcProperties
)
df.show()
Hope these are good pointers for you to investigate further.
Created 08-06-2025 10:06 PM
Hi @Anshul_Gupta,
Please find the answers below,
Please let me know if you need any additional information. I'm happy to help.
Created 08-08-2025 02:48 PM
Have you tried to use the Phoenix–Spark connector using the following documentation - https://docs.cloudera.com/cdp-private-cloud-base/7.1.9/phoenix-access-data/topics/phoenix-understand...?
If not, please give it a try and let us know if, for any reason, it does not meet your use cases.
Created 08-08-2025 03:54 PM
@willx @ayushi Hi! By any chance do you have some insights here? Thanks!
Regards,
Diana Torres,