Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Phoenix Spark Connector with HDP 2.6.4: Pushdown on a timestamp column creates exception

Highlighted

Phoenix Spark Connector with HDP 2.6.4: Pushdown on a timestamp column creates exception

New Contributor

Hi.

I'm using the Phoenix Spark-connector on HDP Sandbox 2.6.4. I have a timestamp column that I am using to filter the Phoenix data on load. This is necessary because the data are streaming data, so I can't load the whole dataset for every batch. But when I try to filter on the timestamp column, I get an exception thrown. This appears to be related to PHOENIX-3664, except that I'm using Scala instead of Ptyhon. I'm pasting below code to replicate the exception (run from a JAR), along with the stack trace below. I'm wondering if anyone has any suggestions for workarounds or any knowledge of when the bugfix will appear in HDP.

Thanks,
Tavis

    val spark = SparkSession
      .builder
      .appName("HDP Phoenix Test")
      .getOrCreate()
    import spark.sqlContext.implicits._
   
    val dfToSave = spark.sparkContext.parallelize((1 to 50).toSeq).toDF("PersonID")
                  .withColumn("var1",rand())
                  .withColumn("modifieddate",current_timestamp)
    
      var connection:Connection = null
      val jdbcHost = "localhost"
      
      try {
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
        connection = DriverManager.getConnection("jdbc:phoenix:" + jdbcHost + ":2181:/hbase-unsecure")  


        val statement1 = connection.prepareStatement("DROP TABLE IF EXISTS deleteme")
        val resultSet1 = statement1.execute
        connection.commit
        val createSQL = "CREATE TABLE deleteme " +
                        "  (PersonID INTEGER NOT NULL Primary Key, " + 
                        "   VAR1 DOUBLE, modifieddate TIMESTAMP)"
        val statement2 = connection.prepareStatement(createSQL)
        val resultSet2 = statement2.execute
        connection.commit
        connection.close()
      } catch {
        case e: Exception  => {try { connection.close();} catch  { case f: Exception => null }  ; throw e}
      }


      dfToSave.write
      .format("org.apache.phoenix.spark")
      .mode("overwrite")
      .option("table", "deleteme")
      .option("zkUrl", "localhost:2181:/hbase-unsecure")
      .save()


      val executionTimestamp = new java.sql.Timestamp(java.util.Calendar.getInstance.getTimeInMillis)


      val dfToLoad = spark.sqlContext
                      .read
                      .format("org.apache.phoenix.spark")
                      .option("table", "deleteme")
                      .option("zkUrl", "localhost:2181:/hbase-unsecure")
                      .load()
                      .filter((col("modifieddate") <=executionTimestamp))


      dfToLoad.show

Here's the exception stack trace:

ERROR PhoenixInputFormat: Failed to get the query plan with error [ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "15" at line 1, column 131.]
Exception in thread "main" java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "15" at line 1, column 131.
        at org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:195)
        at org.apache.phoenix.mapreduce.PhoenixInputFormat.getSplits(PhoenixInputFormat.java:87)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.phoenix.spark.PhoenixRDD.getPartitions(PhoenixRDD.scala:55)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
        at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
        at com.russellreynolds.datascience.hdptesting.App$.main(App.scala:418)
        at com.russellreynolds.datascience.hdptesting.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "15" at line 1, column 131.
        at org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33)
        at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:111)
        at org.apache.phoenix.jdbc.PhoenixStatement$PhoenixStatementParser.parseStatement(PhoenixStatement.java:1280)
        at org.apache.phoenix.jdbc.PhoenixStatement.parseStatement(PhoenixStatement.java:1363)
        at org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1373)
        at org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368)
        at org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:183)
        ... 76 more
Caused by: MismatchedTokenException(106!=129)
        at org.apache.phoenix.parse.PhoenixSQLParser.recoverFromMismatchedToken(PhoenixSQLParser.java:360)
        at org.apache.phoenix.shaded.org.antlr.runtime.BaseRecognizer.match(BaseRecognizer.java:115)
        at org.apache.phoenix.parse.PhoenixSQLParser.not_expression(PhoenixSQLParser.java:6879)
        at org.apache.phoenix.parse.PhoenixSQLParser.and_expression(PhoenixSQLParser.java:6694)
        at org.apache.phoenix.parse.PhoenixSQLParser.or_expression(PhoenixSQLParser.java:6631)
        at org.apache.phoenix.parse.PhoenixSQLParser.expression(PhoenixSQLParser.java:6596)
        at org.apache.phoenix.parse.PhoenixSQLParser.single_select(PhoenixSQLParser.java:4632)
        at org.apache.phoenix.parse.PhoenixSQLParser.unioned_selects(PhoenixSQLParser.java:4714)
        at org.apache.phoenix.parse.PhoenixSQLParser.select_node(PhoenixSQLParser.java:4780)
        at org.apache.phoenix.parse.PhoenixSQLParser.oneStatement(PhoenixSQLParser.java:789)
        at org.apache.phoenix.parse.PhoenixSQLParser.statement(PhoenixSQLParser.java:508)
        at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:108)
        ... 81 more
2 REPLIES 2

Re: Phoenix Spark Connector with HDP 2.6.4: Pushdown on a timestamp column creates exception

PHOENIX-3540 (the actual issue fixed, PHOENIX-3664 was not as it was closed as "Duplicate") has been in HDP since 2.6.0 (if not earlier).

It seems like there might be an issue with the generated SQL. Can you reduce your application to a minimal program that shows this? Perhaps turn on some more verbose logging to see the query being executed?

Re: Phoenix Spark Connector with HDP 2.6.4: Pushdown on a timestamp column creates exception

New Contributor

I think the program is fairly minimal. I have to (1) create a table with a timestamp column and (2) read and filter that table. The rest of the code just ensures that the database is in a consistent state. (There is one extra column with a random number but that seems pretty minor.) What would you have me take out?

Don't have an account?
Coming from Hortonworks? Activate your account here