Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar
Expert Contributor

Short Description:

Spark Hbase Connector (SHC) is currently hosted in Hortonworks repo and published as spark package. Below is simple example how to access Hbase table in Spark shell and Load the data into DataFrame. Once data is in Dataframe we can use SqlContext to run queries on the DataFrame.

Article

The documentation here leaves out a few pieces in order access HBase tables using SHC with spark shell. Here is the Example accessing Hbase "emp" table in Spark shell.

Hbase Shell

Create a simple "emp" Hbase table using Hbase shell and insert sample data

create 'emp', 'personal data', 'professional data'
put 'emp','1','personal data:name','raju'
put 'emp','1','personal data:city','hyderabad'
put 'emp','1','professional data:designation','manager'
put 'emp','1','professional data:salary','50000'

Once created exit Hbase shell and run spark shell providing SHC package and hbase-site.xml

/usr/hdp/current/spark-client/bin/spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10 --files /etc/hbase/conf/hbase-site.xml

Import the required classes

scala> import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.{SQLContext, _}

scala> import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.execution.datasources.hbase._

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

Define the Hbase schema for mapping the table, rowkey also been defined as a column (empNumber) which has a specific cf (rowkey).

scala> def empcatalog = s"""{
"table":{"namespace":"default", "name":"emp"},
"rowkey":"key",
"columns":{
"empNumber":{"cf":"rowkey", "col":"key", "type":"string"},
"city":{"cf":"personal data", "col":"city", "type":"string"},
"empName":{"cf":"personal data", "col":"name", "type":"string"},
"jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"},
"salary":{"cf":"professional data", "col":"salary", "type":"string"}
}
}""".stripMargin

Perform DataFrame operation on top of HBase table, First we define and then load data into Dataframe.

scala> def withCatalog(empcatalog: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->empcatalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
withCatalog: (empcatalog: String)org.apache.spark.sql.DataFrame

scala> val df = withCatalog(empcatalog)
df: org.apache.spark.sql.DataFrame = [city: string, empName: string, jobDesignation: string, salary: string, empNumber: string]

scala> df.show
17/11/08 18:04:22 INFO RecoverableZooKeeper: Process identifier=hconnection-0x55a690be connecting to ZooKeeper ensemble=vb-atlas-node1.hortonworks.com:2181,vb-atlas-node2.hortonworks.com:2181,vb-atlas-ambari.hortonworks.com:2181
17/11/08 18:04:22 INFO ZooKeeper: Client environment:zookeeper.version=3.4.6-8--1, built on 04/01/201
.
.
.
17/11/08 18:04:24 INFO DAGScheduler: ResultStage 0 (show at <console>:39) finished in 1.011 s
17/11/08 18:04:24 INFO DAGScheduler: Job 0 finished: show at <console>:39, took 1.230151 s
+---------+-------+--------------+------+---------+
| city|empName|jobDesignation|salary|empNumber|
+---------+-------+--------------+------+---------+
| chennai| ravi| manager| 50000| 1|
|hyderabad| raju| engineer| null| 2|
| delhi| rajesh| jrenginner| null| 3|
+---------+-------+--------------+------+---------+


We can query using sqlContext on the dataframe.

scala> df.registerTempTable("table")

scala>sqlContext.sql("select empNumber,jobDesignation from table").show
 
 +---------+--------------+
|empNumber|jobDesignation|
+---------+--------------+
|        1|       manager|
|        2|      engineer|
|        3|    jrenginner|
+---------+--------------+

Reference :

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

https://github.com/hortonworks-spark/shc/blob/master/examples/src/main/scala/org/apache/spark/sql/ex...

13,908 Views
Comments
avatar
New Contributor

When running the command:

 scala> val df = withCatalog(empcatalog)

I have the following problem:

java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at withCatalog(<console>:32)
... 52 elided
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 71 more

I think the problem is I'm using Spark 2.1.0 and it no longer has org.apache.spark.logging. Can you please advise on how to resolve this issue without downgrading Spark version?

Thank you very much!

I did the following.

I created a symlink for hbase-site.xml on each edge node as follows.


ln -s /etc/hbase/conf/hbase-site.xml /etc/spark2/conf/hbase-site.xml


I started spark shell with the following params.

spark-shell --driver-memory 5g --jars /usr/hdp/2.6.5.0-292/shc/shc-core-1.1.0.2.6.5.0-292.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar