Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to use Spark-Phoenix connection to run join queries on multiple tables?

avatar
Explorer

I want to connect to apache phoenix from spark and run join sql query. As suggested by Phoenix official website, they have given an example on how to connect to phoenix from spark but it takes single phoenix table name in the configuration. see the example below

Map<String, String> map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL);
map.put("table", "TABLE_1");
Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df.registerTempTable("TABLE_1"); Dataset<Row> selectResult = df.sparkSession().sql(" SELECT * FROM TABLE_1 WHERE COLUMN_1 = 'ABC' ");


In my phoenix-hbase database I have two tables TABLE_1 and TABLE_2
I have one sql query like this

SELECT * FROM TABLE_1 as A JOIN TABLE_2 as B ON A.COLUMN_1 = B.COLUMN_2 WHERE B.COLUMN_2 = 'XYZ';
How I can run this query using Phoenix-Spark connection?

Thanks

1 ACCEPTED SOLUTION

avatar
Explorer

Here how I solved this problem, sharing it so that if someone else also face this issue then it will be helpful for him

I am loading the two datasets separately and then registering them as temp table and now I am able to run the join query using those two tables. Below is the sample code.

String table1 = "TABLE_1";
Map<String, String> map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL);
map.put("table", table1);
Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df.registerTempTable(tableName);
String table2 = "TABLE_2"; map = new HashMap<>(); map.put("zkUrl", ZOOKEEPER_URL); map.put("table", table2); Dataset<Row> df2 = sparkSession.sqlContext().load("org.apache.phoenix.spark", map); df2.registerTempTable(table2);
Dataset<Row> selectResult = df.sparkSession().sql(" SELECT * FROM TABLE_1 as A JOIN TABLE_2 as B ON A.COLUMN_1 = B.COLUMN_2 WHERE B.COLUMN_2 = 'XYZ' ");

View solution in original post

1 REPLY 1

avatar
Explorer

Here how I solved this problem, sharing it so that if someone else also face this issue then it will be helpful for him

I am loading the two datasets separately and then registering them as temp table and now I am able to run the join query using those two tables. Below is the sample code.

String table1 = "TABLE_1";
Map<String, String> map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL);
map.put("table", table1);
Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df.registerTempTable(tableName);
String table2 = "TABLE_2"; map = new HashMap<>(); map.put("zkUrl", ZOOKEEPER_URL); map.put("table", table2); Dataset<Row> df2 = sparkSession.sqlContext().load("org.apache.phoenix.spark", map); df2.registerTempTable(table2);
Dataset<Row> selectResult = df.sparkSession().sql(" SELECT * FROM TABLE_1 as A JOIN TABLE_2 as B ON A.COLUMN_1 = B.COLUMN_2 WHERE B.COLUMN_2 = 'XYZ' ");