Member since
01-19-2018
5
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1752 | 01-19-2018 10:06 AM |
09-05-2018
05:54 AM
Hi Josh Elser, Thanks for your response, yes it looks like the same issue as reported in PHOENIX-4489.
... View more
08-29-2018
06:25 AM
Hi, I am using Hbase-1.1 to store our data through Apache Phoenix-4.11 which provides the SQL interface for Hbase. I am using Spark-2.1.1 to analyze the data stored in hbase tables. I am loading those tables from hbase as dataframe and running sql queries using Spark-sql. I am using Apache Spark plugin provided by Apache Phoenix to connect Spark with Hbase (https://phoenix.apache.org/phoenix_spark.html). This is how I am loading the hbase tables // Step 1: Registering few main-tables(4-5 tables) with spark-sql
Map<String, String> map = new HashMap<>();
map.put("zkUrl", "sandbox-hdp.hortonworks.com:2181:/hbase-unsecure");
for(String tableName : tableNames){
map.put("table", tableName);
logger.info("Registering table = "+ tableName);
logger.info("map = "+ map);
Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df.registerTempTable(tableName);
}
and then I am running some set of sql queries like this // Step 2: Running few set of sql queries to filter-out the data, registering intermediate data as temp-tables and using it in the next query, saving the final result-set to csv file and removing all the the intermediate temp-tables
List<String> tempTableLIst = new ArrayList<>();
selectResult = sparkSession.sql(selectQry);
selectResult.registerTempTable(tempTableName);
tempTableLIst.add(tempTableName);
// running further queries using these newly registered tempTableName
....
....
selectResult = sparkSession.sql(selectQry);
selectResult.registerTempTable(tempTableName);
tempTableLIst.add(tempTableName);
....
....
//Finally saving the filtered data from dataframe to csv
selectResult.write().mode("overwrite").csv(outputFilePath);
//Removing all the temp tables
for(String tableName : tempTableLIst){
sparkSession.sqlContext().dropTempTable(tableName);
} " Step 2" is repeated multiple times, I notice that the number of hbase open connection is getting increased with each iteration which finally resulting in the job failure because zookeeper is denying the further connection. We increased the maxClientCnxns in zookeeper to 2000 but the open connection is going beyond that also. I have no idea why spark is opening so many connections with hbase (zookeeper), why it is not closing / reusing the old open connections. Please share if you have any info/idea about this issue, that would be of great help. Thanks, Fairoz
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache Phoenix
-
Apache Spark
05-21-2018
05:34 AM
@Sampath Kumar DId you found the root cause and solution for this problem. I am facing the same issue. Please post the solution, it will help others. Thanks, Fairoz
... View more
01-19-2018
10:06 AM
Here how I solved this problem, sharing it so that if someone else also face this issue then it will be helpful for him I am loading the two datasets separately and then registering them as temp table and now I am able to run the join query using those two tables. Below is the sample code.
String table1 = "TABLE_1";
Map<String, String> map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL);
map.put("table", table1);
Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df.registerTempTable(tableName);
String table2 = "TABLE_2";
map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL);
map.put("table", table2);
Dataset<Row> df2 = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
df2.registerTempTable(table2);
Dataset<Row> selectResult = df.sparkSession().sql(" SELECT * FROM TABLE_1 as A JOIN TABLE_2 as B ON A.COLUMN_1 = B.COLUMN_2 WHERE B.COLUMN_2 = 'XYZ' ");
... View more
01-19-2018
07:16 AM
I want to connect to apache phoenix from spark and run join sql query. As suggested by Phoenix official website, they have given an example on how to connect to phoenix from spark but it takes single phoenix table name in the configuration. see the example below Map<String, String> map = new HashMap<>();
map.put("zkUrl", ZOOKEEPER_URL); map.put("table", "TABLE_1"); Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map); df.registerTempTable("TABLE_1");
Dataset<Row> selectResult = df.sparkSession().sql(" SELECT * FROM TABLE_1 WHERE COLUMN_1 = 'ABC' "); In my phoenix-hbase database I have two tables TABLE_1 and TABLE_2 I have one sql query like this SELECT * FROM TABLE_1 as A JOIN TABLE_2 as B ON A.COLUMN_1 = B.COLUMN_2 WHERE B.COLUMN_2 = 'XYZ'; How I can run this query using Phoenix-Spark connection? Thanks
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache Phoenix
-
Apache Spark