Created 11-14-2018 08:10 AM
I have a Phoenix Table, that I can access via SparkSQL (with Phoenix Spark Plugin). The table has also a Timestamp column.
I have to filter this Timestamp column by a user input, like 2018-11-14 01:02:03. So I want to filter my Dataset (that represents the read Phoenix table) with the where / filter methods.
My actual Java code looks the following:
Timestamp t1 = new Timestamp(sdf.parse(dateFrom).getTime()); Timestamp t2 = new Timestamp(sdf.parse(dateTo).getTime()); Column c1 = new Column("TABLE_TS_COL").geq(t1); Column c2 = new Column("TABLE_TS_COL").leq(t2); Dataset<Row> dsResult = sqlContext.read() .format("org.apache.phoenix.spark") .option("table", tableName) .option("zkUrl", hbaseUrl).load() .where("OTHER_COLUMN = " + inputId) // This works .where(c1) // Problem! .where(c2) // Problem!
But this leads to follwoing exception:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "06" at line 1, column 474.
My Spark History UI shows the following select statement:
... 18/11/14 08:54:58 INFO PhoenixInputFormat: Select Statement: SELECT "OTHER_COLUMN", "TABLE_TS_COL" FROM HBASE_TEST3 WHERE ( "OTHER_COLUMN" = 0 AND "OTHER_COLUMN" IS NOT NULL AND "TABLE_TS_COL" IS NOT NULL AND "TABLE_TS_COL" >= 2018-09-24 06:49:01.0 AND "TABLE_TS_COL" <= 2018-09-24 06:49:01.0)
For me it looks like the quotation marks are missing for the timestamp values (not sure about that)?
How can I filter a Timestamp column by a user input in Java and SparkSQL?
Created 11-14-2018 10:45 AM
I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html
So, my code now looks like this:
Dataset<Row> dsResult = sqlC.read() .format("org.apache.phoenix.spark") .option("table", tableName) .option("zkUrl", hbaseUrl).load() .where("OTHER_COLUMN = " + inputId) .filter(row -> { long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime(); long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime(); long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime(); return readTime >= tsFrom && readTime <= tsTo; });
Created 11-14-2018 10:45 AM
I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html
So, my code now looks like this:
Dataset<Row> dsResult = sqlC.read() .format("org.apache.phoenix.spark") .option("table", tableName) .option("zkUrl", hbaseUrl).load() .where("OTHER_COLUMN = " + inputId) .filter(row -> { long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime(); long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime(); long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime(); return readTime >= tsFrom && readTime <= tsTo; });