I have a Phoenix query with some aggregate functions and sub-query. When i check the Phoenix Spark Plugin, in the limitations it says, it does not support aggregate functions.
How can i execute the Phoenix query using Spark? I don't see any example. We executed the queries in Phoenix console and its working fine.
I came across this gist https://gist.github.com/mravi/444afe7f49821819c987 which is using newAPIHadoopRDD using Java API, will this support complex phoenix queries? where can i find more examples on this using Scala or Java API?
Is there any other way, can i execute the Phoenix query and get the results?
You can just pass the filter in Phoenix query and once data is loaded as RDD or DataFrame, you can use all the available aggregation method in spark for your use-case.
@Ankit Singhal Also, i dont want to load the entire table, currently we are using HortonWorks Hbase connector, i hope this supports predicate push down, i mean if i apply where on the DF, its going to load only the filtered records not the entire table.
val conf = new SparkConf(); var sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val hbaseConfiguration= HBaseConfiguration.create() /** Table loaded from Phoenix without any predicate*/ val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration) df1.registerTempTable("sql_table_1") /** Table loaded from Phoenix with a predicate ID=1(filter will be pushed down to Phoenix) and only Predicate passed records will be loaded in spark */ val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"), predicate = Some("\"ID\" = 1"),conf = hbaseConfiguration) df2.registerTempTable("sql_table_2") /** Doing join with Spark (Not phoenix)*/ val sqlRdd = sqlContext.sql(""" |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin ) /** using aggregate method of spark (not Phoenix))*/ val count = sqlRdd.count()
@Sankaraiah Narayanasamy You may pass predicate during DataFrame creation. Like :
val df2 = sqlContext.phoenixTableAsDataFrame("TABLE", Array("COL1", "COL2"),predicate = Some("\"COL1\" = 1"),conf = hbaseConf)