Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Filter a Phoenix Timestamp Column in SparkSQL (Java)

avatar
Expert Contributor

I have a Phoenix Table, that I can access via SparkSQL (with Phoenix Spark Plugin). The table has also a Timestamp column.

I have to filter this Timestamp column by a user input, like 2018-11-14 01:02:03. So I want to filter my Dataset (that represents the read Phoenix table) with the where / filter methods.

My actual Java code looks the following:

Timestamp t1 = new Timestamp(sdf.parse(dateFrom).getTime());
Timestamp t2 = new Timestamp(sdf.parse(dateTo).getTime());
		
Column c1 = new Column("TABLE_TS_COL").geq(t1);
Column c2 = new Column("TABLE_TS_COL").leq(t2);
		
Dataset<Row> dsResult = sqlContext.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()
			
  .where("OTHER_COLUMN = " + inputId) // This works
  .where(c1)  // Problem!
  .where(c2)  // Problem!

But this leads to follwoing exception:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "06" at line 1, column 474.

My Spark History UI shows the following select statement:

...
18/11/14 08:54:58 INFO PhoenixInputFormat: Select Statement: SELECT "OTHER_COLUMN", "TABLE_TS_COL" FROM HBASE_TEST3 WHERE ( "OTHER_COLUMN" = 0 AND "OTHER_COLUMN" IS NOT NULL AND "TABLE_TS_COL" IS NOT NULL AND "TABLE_TS_COL" >= 2018-09-24 06:49:01.0 AND "TABLE_TS_COL" <= 2018-09-24 06:49:01.0)

For me it looks like the quotation marks are missing for the timestamp values (not sure about that)?

How can I filter a Timestamp column by a user input in Java and SparkSQL?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html

So, my code now looks like this:

Dataset<Row> dsResult = sqlC.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()

  .where("OTHER_COLUMN = " + inputId)
  .filter(row -> {
	long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime();		
	long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime();
	long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime();
					
	return readTime >= tsFrom && readTime <= tsTo;
  });

View solution in original post

1 REPLY 1

avatar
Expert Contributor

I found the following Java based solution for me: Using the Dataset.filter method with FilterFunction: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html

So, my code now looks like this:

Dataset<Row> dsResult = sqlC.read()
  .format("org.apache.phoenix.spark")
  .option("table", tableName)
  .option("zkUrl", hbaseUrl).load()

  .where("OTHER_COLUMN = " + inputId)
  .filter(row -> {
	long readTime = row.getTimestamp(row.fieldIndex("TABLE_TS_COL")).getTime();		
	long tsFrom = new Timestamp(sdf.parse(dateFrom).getTime()).getTime();
	long tsTo = new Timestamp(sdf.parse(dateTo).getTime()).getTime();
					
	return readTime >= tsFrom && readTime <= tsTo;
  });