Support Questions
Find answers, ask questions, and share your expertise

Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Cannot get the location for replica0 of region for card_transactions

Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Cannot get the location for replica0 of region for card_transactions

New Contributor

Hi,

 

I'm trying to do a small POC using Spark, Hive and HBase.

 

I'm processing some information using spark and when trying to write to hive managed hbase table 'card_lookup', I'm getting the below exception, 

 

Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Cannot get the location for replica0 of region for card_transactions

 

Caused by: java.io.IOException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase

 

I checked the cloudera community and tried setting up the 'SPARK_CLASSPATH' as,

export SPARK_CLASSPATH=/opt/hbase-2.3.4/conf/hbase-site.xml,/opt/apache-hive-3.1.2-bin/conf/hive-site.xml

I have configured the hbase configuration in my code,

conf.set("hbase.zookeeper.quorum", envProps.getString("zookeeper.quorum"))
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
conf.set("hbase.cluster.distributed", "true")
conf.set("hbase.thrift.support.proxyuser", "true")
conf.set("hbase.regionserver.thrift.http", "true")

 Checked the hbase-site.xml, and found "zookeeper.znode.parent" set to "/hbase". Tried "/hbase-unsecure" too. But, no luck.

 

Spark Version in the Cluster - 2.4.7

HBase Version - 2.3.4

Hadoop Version - 2.7.3

 

Please find the code sample below,

 

 

 

package com.frauddetection

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object UCLCalculation {

  def main(args: Array[String]): Unit = {

    // Read the configuration file

    val config = ConfigFactory.load
    val envProps = config.getConfig(args(0))

    // Setting up hbase connection
    def getHbaseConnection(config: Config): Connection = {
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", envProps.getString("zookeeper.quorum"))
      conf.set("hbase.zookeeper.property.clientPort", envProps.getString("zookeeper.port"))
      conf.set("zookeeper.znode.parent", "/hbase")
      conf.set("hbase.cluster.distributed", "true")
      conf.set("hbase.thrift.support.proxyuser", "true")
      conf.set("hbase.regionserver.thrift.http", "true")
      //conf.set("hbase.rootdir", "hdfs://m01.itversity.com:9000/hbase")
      val connection: Connection = ConnectionFactory.createConnection(conf)
      connection
    }


    // Initiate the spark session
    val sparkConf = new SparkConf()
    sparkConf.set("hive.metastore.uris", "thrift://m02.itversity.com:9083")
    val spark = SparkSession.
      builder().
      appName("UCL Calculation").
      config(sparkConf).
      enableHiveSupport().
      getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Establish Hbase connection for calculating the UCL using hive managed hbase table 'card_transactions'
    val connection = getHbaseConnection(envProps)
    val tableName = connection.getTable(TableName.valueOf("card_lookup"))

    import spark.sql

    // Function to calculate the UCL score for a particular card_id and member_id
    val df_ucl = sql(
      """
        |with cte_rownum as
        |			(
        |			select card_id,amount,member_id,transaction_dt,
        |			first_value(postcode) over(partition by card_id order by transaction_dt desc) as postcode,
        |			row_number() over(partition by card_id order by transaction_dt desc) rownum
        |			from realtime_dw.card_transactions
        |			)
        |			select card_id,member_id,
        |			round((avg(amount)+ 3* max(std)),0) as ucl ,
        |			max(score) score,
        |			max(transaction_dt) as last_txn_time,
        |			max(Postcode)as last_txn_zip
        |			from
        |			(	select
        |			card_id,amount,
        |			c.member_id,
        |			m.score,
        |			c.transaction_dt,
        |			Postcode,
        |			STDDEV (amount) over(partition by card_id order by (select 1)  desc) std
        |			from cte_rownum c
        |			inner join realtime_dw.member_score m on c.member_id=m.member_id
        |			where rownum<=10
        |			)a
        |			group by card_id,member_id""".stripMargin)

    val finalDF = df_ucl.toDF("card_id", "member_id", "ucl", "score", "last_txn_time", "last_txn_zip")

    // Write the look up contents to card_lookup table

    finalDF.foreach { myRow => {
      var myArray = myRow.mkString(",").split(",")
      var cardId = myArray(0)
      var memberId = myArray(1)
      var ucl = myArray(2)
      var score = myArray(3)
      var lastTxnTime = myArray(4)
      var lastTxnZip = myArray(5)
      val row = new Put(Bytes.toBytes(cardId))
      row.addColumn(Bytes.toBytes("lkp_data"), Bytes.toBytes("member_id"), Bytes.toBytes(memberId))
      row.addColumn(Bytes.toBytes("lkp_data"), Bytes.toBytes("ucl"), Bytes.toBytes(ucl))
      row.addColumn(Bytes.toBytes("lkp_data"), Bytes.toBytes("score"), Bytes.toBytes(score))
      row.addColumn(Bytes.toBytes("lkp_data"), Bytes.toBytes("last_txn_time"), Bytes.toBytes(lastTxnTime))
      row.addColumn(Bytes.toBytes("lkp_data"), Bytes.toBytes("last_txn_zip"), Bytes.toBytes(lastTxnZip))
      tableName.put(row)
      connection.close()
    }
    }
  }

}

 

 

The spark-submit is,

export SPARK_CLASSPATH=/opt/hbase-2.3.4/conf/hbase-site.xml,/opt/apache-hive-3.1.2-bin/conf/hive-site.xml
/opt/spark2-client/bin/spark-submit \
--class com.frauddetection.UCLCalculation \
--master yarn \
--deploy-mode client \
--conf spark.ui.port=4926 \
--files "/opt/hbase-2.3.4/conf/hbase-site.xml,/opt/apache-hive-3.1.2-bin/conf/hive-site.xml" \
--jars $(echo /home/itv536467/external_jars/*.jar | tr ' ' ',') \
frauddetection_ucl_2.11-0.1.jar prod

 

The hive tables,

 

create table card_lookup
(
member_id bigint,
card_id bigint ,
ucl float ,
score float,
last_txn_time timestamp,
last_txn_zip string
)
CLUSTERED by (card_id) into 8 buckets
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping"=":key,lkp_data:member_id,lkp_data:ucl,lkp_data:score, lkp_data:last_txn_time,lkp_data:last_txn_zip")
TBLPROPERTIES ("hbase.table.name" = "card_lookup");

 

create table if not exists card_transactions (
cardid_txnts string,
card_id bigint,
member_id bigint,
amount bigint,
postcode int,
pos_id bigint,
transaction_dt timestamp,
status string
)
CLUSTERED BY (card_id) into 8 buckets
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,trans_data:card_id,trans_data:member_id,trans_data:amount,trans_data:postcode,trans_data:pos_id,trans_data:transaction_dt,trans_data:status")
TBLPROPERTIES("hbase.table.name" = "card_transactions");

 

Please let me know if I have missed anything.

 

Thanks,

Venkatesh Raman