Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark, Apache Phoenix and Hbase, connection utilization issue

Solved Go to solution

Spark, Apache Phoenix and Hbase, connection utilization issue

New Contributor

Hi,
I am using Hbase-1.1 to store our data through Apache Phoenix-4.11 which provides the SQL interface for Hbase. I am using Spark-2.1.1 to analyze the data stored in hbase tables. I am loading those tables from hbase as dataframe and running sql queries using Spark-sql. I am using Apache Spark plugin provided by Apache Phoenix to connect Spark with Hbase (https://phoenix.apache.org/phoenix_spark.html).
This is how I am loading the hbase tables

// Step 1: Registering few main-tables(4-5 tables) with spark-sql
Map<String, String> map = new HashMap<>();
map.put("zkUrl", "sandbox-hdp.hortonworks.com:2181:/hbase-unsecure");
for(String tableName : tableNames){
    map.put("table", tableName);
    logger.info("Registering table = "+ tableName);
    logger.info("map = "+ map);
    Dataset<Row> df = sparkSession.sqlContext().load("org.apache.phoenix.spark", map);
    df.registerTempTable(tableName);
}

and then I am running some set of sql queries like this

// Step 2: Running few set of sql queries to filter-out the data, registering intermediate data as temp-tables and using it in the next query, saving the final result-set to csv file and removing all the the intermediate temp-tables
List<String> tempTableLIst = new ArrayList<>();
selectResult = sparkSession.sql(selectQry);
selectResult.registerTempTable(tempTableName);
tempTableLIst.add(tempTableName);
// running further queries using these newly registered tempTableName
....
....
selectResult = sparkSession.sql(selectQry);
selectResult.registerTempTable(tempTableName);
tempTableLIst.add(tempTableName);
....
....
//Finally saving the filtered data from dataframe to csv
selectResult.write().mode("overwrite").csv(outputFilePath);
//Removing all the temp tables
for(String tableName : tempTableLIst){
    sparkSession.sqlContext().dropTempTable(tableName);
}

" Step 2" is repeated multiple times, I notice that the number of hbase open connection is getting increased with each iteration which finally resulting in the job failure because zookeeper is denying the further connection. We increased the maxClientCnxns in zookeeper to 2000 but the open connection is going beyond that also.

I have no idea why spark is opening so many connections with hbase (zookeeper), why it is not closing / reusing the old open connections.

Please share if you have any info/idea about this issue, that would be of great help.

Thanks,

Fairoz

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark, Apache Phoenix and Hbase, connection utilization issue

Sounds like you're hitting https://issues.apache.org/jira/browse/PHOENIX-4489. This was fixed in HDP-2.6.5.

However, it seems like you are using a version of Phoenix which is not included in HDP, so you are on your own to address that issue.

2 REPLIES 2

Re: Spark, Apache Phoenix and Hbase, connection utilization issue

Sounds like you're hitting https://issues.apache.org/jira/browse/PHOENIX-4489. This was fixed in HDP-2.6.5.

However, it seems like you are using a version of Phoenix which is not included in HDP, so you are on your own to address that issue.

Re: Spark, Apache Phoenix and Hbase, connection utilization issue

New Contributor

Hi Josh Elser,

Thanks for your response, yes it looks like the same issue as reported in PHOENIX-4489.

Don't have an account?
Coming from Hortonworks? Activate your account here