Support Questions

Find answers, ask questions, and share your expertise

Getting NullPointer Exception when a join query is executed on a Phoenix Database

avatar
Frequent Visitor

Hi,
I'm attempting to run a simple join query between two tables in an Apache Phoenix database on a Spark cluster. However, when I try to read from the resulting DataFrame after executing the query, I encounter a NullPointerException.
Query : (SELECT CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID,CONNTEST.EMPLOYEEFILTERJOIN.NAME,CONNTEST.EMPLOYEEFILTERJOIN.SALARY,CONNTEST.DEPARTMENTFILTERJOIN.DEPT_ID,CONNTEST.DEPARTMENTFILTERJOIN.DETP_NAME FROM CONNTEST.EMPLOYEEFILTERJOIN,CONNTEST.DEPARTMENTFILTERJOIN WHERE CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID=CONNTEST.DEPARTMENTFILTERJOIN.EMP_ID) query_alias

Below is the code I'm executing,

val spark = SparkSession.builder()
.appName("PhoenixWriteIssueSimulator")
.master("local[*]")
.config("spark.sql.catalogImplementation", "in-memory") 
.getOrCreate()

val jdbcProperties = new Properties()
jdbcProperties.setProperty("driver","org.apache.phoenix.jdbc.PhoenixDriver")

var df = spark.read
.jdbc(phoenixJdbcUrl, phoenixSourceQuery, jdbcProperties)
var df = spark.read
.jdbc(
    url = s"${phoenixJdbcUrl}",
    table = s"${phoenixSourceQuery}",
    properties = jdbcProperties
);

df.show(20);

df.show(20) is failing with the below stacktrace,

25/08/01 15:34:51 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
	at org.apache.phoenix.util.SchemaUtil.getSchemaNameFromFullName(SchemaUtil.java:713)
	at org.apache.phoenix.schema.TableNotFoundException.<init>(TableNotFoundException.java:44)
	at org.apache.phoenix.compile.FromCompiler$MultiTableColumnResolver.resolveTable(FromCompiler.java:1013)
	at org.apache.phoenix.compile.JoinCompiler$ColumnRefParseNodeVisitor.visit(JoinCompiler.java:1399)
	at org.apache.phoenix.compile.JoinCompiler$ColumnRefParseNodeVisitor.visit(JoinCompiler.java:1371)
	at org.apache.phoenix.parse.ColumnParseNode.accept(ColumnParseNode.java:56)
	at org.apache.phoenix.util.ParseNodeUtil.applyParseNodeVisitor(ParseNodeUtil.java:67)
	at org.apache.phoenix.compile.JoinCompiler.compile(JoinCompiler.java:172)
	at org.apache.phoenix.compile.QueryCompiler.compileSelect(QueryCompiler.java:248)
	at org.apache.phoenix.compile.QueryCompiler.compile(QueryCompiler.java:178)
	at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:543)
	at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:506)
	at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:311)
	at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:300)
	at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
	at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:299)
	at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:292)
	at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeQuery(PhoenixPreparedStatement.java:190)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$14.apply(Executor.scala:459)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

I have also checked the schame by printing df.printSchema() below is the output for the same,

root
 |-- CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID: integer (nullable = true)
 |-- CONNTEST.EMPLOYEEFILTERJOIN.NAME: string (nullable = true)
 |-- CONNTEST.EMPLOYEEFILTERJOIN.SALARY: integer (nullable = true)
 |-- CONNTEST.DEPARTMENTFILTERJOIN.DEPT_ID: integer (nullable = true)
 |-- CONNTEST.DEPARTMENTFILTERJOIN.DETP_NAME: string (nullable = true)
After examining the Phoenix-Client-Hbase*.jar(phoenix-client-hbase-2.2-5.1.1.7.1.7.0-551.jar), which is the official Phoenix JDBC driver, I found that when executing a query containing a join condition, the driver calls the JoinCompiler.compile method. In this method, I noticed that the Select statement uses select columns enclosed in double quotes.
However, when the JoinCompiler$ColumnRefParseNodeVisitor.visit method processes these column nodes, both their table name and schema name are null.
For example, the column name appears as ""CONNTEST.EMPLOYEEFILTERJOIN.EMP_ID"" (with two double quotes at the start and end). This causes a NullPointerException for these columns.
I suspected that the issue might be due to using fully qualified column names (schema.table.column) in the query, so I tried running this query:
(SELECT emp.EMP_ID, emp.NAME, emp.SALARY, dep.DEPT_ID, dep.DETP_NAME FROM CONNTEST.EMPLOYEEFILTERJOIN emp, CONNTEST.DEPARTMENTFILTERJOIN dep WHERE emp.EMP_ID = dep.EMP_ID) query_alias
However, the problem persists even with this query.
Could someone help me identify the root cause of this issue and clarify whether it might be a bug in the Phoenix JDBC driver?
5 REPLIES 5

avatar
Community Manager

@Arun_Tejasvi, Welcome to our community! To help you get the best possible answer, I have tagged in our Spark experts @haridjh, who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Contributor

Hi Arun,

  • Has this worked with earlier versions of Cloudera? Which version are you using in this case?
  • Phoenix supports views that can encapsulate joins. This avoids complex parsing issues with subqueries. Have we tried this approach?
CREATE VIEW CONNTEST.JOIN_VIEW AS
SELECT 
  emp.EMP_ID, 
  emp.NAME, 
  emp.SALARY, 
  dep.DEPT_ID, 
  dep.DETP_NAME
FROM CONNTEST.EMPLOYEEFILTERJOIN emp
JOIN CONNTEST.DEPARTMENTFILTERJOIN dep
ON emp.EMP_ID = dep.EMP_ID;

val df = spark.read
  .jdbc(
    url = phoenixJdbcUrl,
    table = "CONNTEST.JOIN_VIEW",
    properties = jdbcProperties
  )
df.show()
  • Is loading the two tables separately and joining them in Spark a viable option for you?
  • Spark also assigns an alias to the subquery clause - https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html. I am not sure the original approach will work as intended, but if you’re able to share examples where it has worked, we would be happy to take a closer look.

Hope these are good pointers for you to investigate further.

avatar
Frequent Visitor

Hi @Anshul_Gupta,
Please find the answers below,

  • This is the first time we are testing this use case. Currently, we are using CDP version 7.1 with spark 3.3.2.
  • Phoenix only supports creating views over a single table using simple SELECT * queries. Complex queries, including those with joins, are not supported. This limitation is documented in the Phoenix documentation under the limitations section - https://phoenix.apache.org/views.html
  • Loading the two tables separately and performing the join in Spark could lead to performance issues, especially if the datasets are large.
  • As I mentioned earlier, this is the first time we're testing this approach. The issue only occurs when the query is executed through Spark; when run directly in sqlline, the query executes successfully without any problems.

Please let me know if you need any additional information. I'm happy to help.

avatar
Contributor

Have you tried to use the Phoenix–Spark connector using the following documentation - https://docs.cloudera.com/cdp-private-cloud-base/7.1.9/phoenix-access-data/topics/phoenix-understand...?

If not, please give it a try and let us know if, for any reason, it does not meet your use cases.

avatar
Community Manager

@willx @ayushi Hi! By any chance do you have some insights here? Thanks!


Regards,

Diana Torres,
Senior Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: