Support Questions
Find answers, ask questions, and share your expertise

Need help to call Hbase tables using SparkSQL

Need help to call Hbase tables using SparkSQL

New Contributor

Hi All,

I'm trying to call HBase tables using SparkSQL.

Need guidance on how to setup and call the Hbase tables.

5 REPLIES 5

Re: Need help to call Hbase tables using SparkSQL

Re: Need help to call Hbase tables using SparkSQL

Hi,

You can try this https://github.com/Huawei-Spark/Spark-SQL-on-HBase

There is some more work is going to make support Spark on HBase which is not released yet. Here you can find more details.

http://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

Re: Need help to call Hbase tables using SparkSQL

New Contributor

Thank you all for the prompt reply.

I'm following the URL http://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/ but somehow stuck in between when mapping the spark and hbase tables.

Code is failing when I'm running below line

sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2")).format("org.apache.spark.sql.execution.datasources.hbase").save()

with the error

scala> sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2")).format("org.apache.spark.sql.execution.datasources.hbase").save() java.lang.IllegalArgumentException: Field "col0" does not exist. at org.apache.spark.sql.types.StructType$anonfun$fieldIndex$1.apply(StructType.scala:234) at org.apache.spark.sql.types.StructType$anonfun$fieldIndex$1.apply(StructType.scala:234)

Here is the complete code which I'm running through spark-shell

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import sqlContext.implicits._
def catalog =s"""{
            |"table":{"namespace":"default", "name":"shcExampleTable"},
            |"rowkey":"key",
            |"columns":{
              |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
              |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
              |"col2":{"cf":"cf2", "col":"col2", "type":"string"}
            |}
          |}""".stripMargin

case class HBaseRecord(col0: String, col1: Int, col2: String)

val data = (0 to 255).map(i => (i.toString, i+1, "Hello")) 

sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2")).format("org.apache.spark.sql.execution.datasources.hbase").save()

val df = sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()

df.registerTempTable("table")

sqlContext.sql("select * from table").show

Re: Need help to call Hbase tables using SparkSQL

New Contributor

Hi @Datta Giri

I did something very similar in my project, and the code looks basically the same with one difference:

On this line you are not creating a HBaseRecord:

val data =(0 to 255).map(i =>(i.toString, i+1,"Hello"))

Change it to

val data =(0 to 255).map(i =>new HBaseRecord(i.toString, i+1,"Hello"))

So from what i figured out your case class columns end up matching the names of those on the dataframe. The catalog that you define for this hbase connector should have the outer column name match the field name of your case class--since you didn't put the data into a case class it can't match it up.

Re: Need help to call Hbase tables using SparkSQL

New Contributor

Here are some helpful notes:

When you get the github project, the default build is for scala 2.11, there is a maven profile you can activate -Pscala2.10 in case your standalone app is 2.10.

The documentation could explain the catalog mapping between the case class, dataframe, and hbase table better.

Of interest is the "columns" json grouping, the field on the left is used for the name in the DataFrame column and should be defined in the case class as well:

case class MyPersonRecord(personId:String, personName:String)

def catalog = s"""{

|"table":{"namespace":"default", "name":"myPeople"},

|"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"},

|"personId":{"cf":"rowkey", "col":"key", "type":"string"},

"personName":{"cf":"cf1", "col":"col1", "type":"string"}

|}

|}""".stripMargin