Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Filter a Phoenix Timestamp Column in SparkSQL (Java)

avatar
Expert Contributor

I have a Phoenix Table, that I can access via SparkSQL (with Phoenix Spark Plugin). The table has also a Timestamp column.

I have to filter this Timestamp column by a user input, like 2018-11-14 01:02:03. So I want to filter my Dataset (that represents the read Phoenix table) with the where / filter methods.

My actual Java code looks the following:

Timestamp t1 = new Timestamp(sdf.parse(dateFrom).getTime());
Timestamp t2 = new Timestamp(sdf.parse(dateTo).getTime());
		
Column c1 = new Column("TABLE_TS_COL").geq(t1);
Column c2 = new Column("TABLE_TS_COL").leq(t2);
		
Dataset<Row> dsResult = sqlContext.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()
			
  .where("OTHER_COLUMN = " + inputId) // This works
  .where(c1)  // Problem!
  .where(c2)  // Problem!

But this leads to follwoing exception:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "06" at line 1, column 474.

My Spark History UI shows the following select statement:

...
18/11/14 08:54:58 INFO PhoenixInputFormat: Select Statement: SELECT "OTHER_COLUMN", "TABLE_TS_COL" FROM HBASE_TEST3 WHERE ( "OTHER_COLUMN" = 0 AND "OTHER_COLUMN" IS NOT NULL AND "TABLE_TS_COL" IS NOT NULL AND "TABLE_TS_COL" >= 2018-09-24 06:49:01.0 AND "TABLE_TS_COL" <= 2018-09-24 06:49:01.0)

For me it looks like the quotation marks are missing for the timestamp values (not sure about that)?

How can I filter a Timestamp column by a user input in Java and SparkSQL?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html

So, my code now looks like this:

Dataset<Row> dsResult = sqlC.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()

  .where("OTHER_COLUMN = " + inputId)
  .filter(row -> {
	long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime();		
	long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime();
	long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime();
					
	return readTime >= tsFrom && readTime <= tsTo;
  });

View solution in original post

1 REPLY 1

avatar
Expert Contributor

I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html

So, my code now looks like this:

Dataset<Row> dsResult = sqlC.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()

  .where("OTHER_COLUMN = " + inputId)
  .filter(row -> {
	long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime();		
	long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime();
	long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime();
					
	return readTime >= tsFrom && readTime <= tsTo;
  });