Created 01-04-2017 11:28 AM
I have a Phoenix query with some aggregate functions and sub-query. When i check the Phoenix Spark Plugin, in the limitations it says, it does not support aggregate functions.
How can i execute the Phoenix query using Spark? I don't see any example. We executed the queries in Phoenix console and its working fine.
I came across this gist https://gist.github.com/mravi/444afe7f49821819c987 which is using newAPIHadoopRDD using Java API, will this support complex phoenix queries? where can i find more examples on this using Scala or Java API?
Is there any other way, can i execute the Phoenix query and get the results?
Created 01-04-2017 12:07 PM
You can just pass the filter in Phoenix query and once data is loaded as RDD or DataFrame, you can use all the available aggregation method in spark for your use-case.
Created 01-04-2017 03:45 PM
@Ankit Singhal Does it mean, all the aggregation methods are supported? also, does it support predicate push down? we are using Spark 1.6.1 and HDP 2.4.2.
Created 01-04-2017 04:01 PM
@Ankit Singhal Also, i dont want to load the entire table, currently we are using HortonWorks Hbase connector, i hope this supports predicate push down, i mean if i apply where on the DF, its going to load only the filtered records not the entire table.
Created 01-05-2017 08:39 AM
val conf = new SparkConf(); var sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val hbaseConfiguration= HBaseConfiguration.create() /** Table loaded from Phoenix without any predicate*/ val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration) df1.registerTempTable("sql_table_1") /** Table loaded from Phoenix with a predicate ID=1(filter will be pushed down to Phoenix) and only Predicate passed records will be loaded in spark */ val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"), predicate = Some("\"ID\" = 1"),conf = hbaseConfiguration) df2.registerTempTable("sql_table_2") /** Doing join with Spark (Not phoenix)*/ val sqlRdd = sqlContext.sql(""" |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin ) /** using aggregate method of spark (not Phoenix))*/ val count = sqlRdd.count()
Created 01-04-2017 02:05 PM
For full Phoenix SQL syntax, you can use Spark's native JDBC features.
Created 01-04-2017 03:42 PM
Thanks for the link, do you have any link where can i see simple examples using Spark JDBC with phoenix queries to query Hbase tables?
Created 01-04-2017 06:28 PM
@Sankaraiah Narayanasamy You may pass predicate during DataFrame creation. Like :
val df2 = sqlContext.phoenixTableAsDataFrame("TABLE", Array("COL1", "COL2"),predicate = Some("\"COL1\" = 1"),conf = hbaseConf)
Created 01-05-2017 03:37 AM
@Sergey Soldatov Is it possible to pass the sql query instead of loading the table? or can you point me to some link, where can i see some simple JDBC code where i can call phoenix hbase table query?