Created on 11-08-2017 08:41 PM
Spark Hbase Connector (SHC) is currently hosted in Hortonworks repo and published as spark package. Below is simple example how to access Hbase table in Spark shell and Load the data into DataFrame. Once data is in Dataframe we can use SqlContext to run queries on the DataFrame.
Article
The documentation here leaves out a few pieces in order access HBase tables using SHC with spark shell. Here is the Example accessing Hbase "emp" table in Spark shell.
Hbase Shell
Create a simple "emp" Hbase table using Hbase shell and insert sample data
create 'emp', 'personal data', 'professional data' put 'emp','1','personal data:name','raju' put 'emp','1','personal data:city','hyderabad' put 'emp','1','professional data:designation','manager' put 'emp','1','professional data:salary','50000'
Once created exit Hbase shell and run spark shell providing SHC package and hbase-site.xml
/usr/hdp/current/spark-client/bin/spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10 --files /etc/hbase/conf/hbase-site.xml
Import the required classes
scala> import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.{SQLContext, _} scala> import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.sql.execution.datasources.hbase._ scala> import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
Define the Hbase schema for mapping the table, rowkey also been defined as a column (empNumber) which has a specific cf (rowkey).
scala> def empcatalog = s"""{ "table":{"namespace":"default", "name":"emp"}, "rowkey":"key", "columns":{ "empNumber":{"cf":"rowkey", "col":"key", "type":"string"}, "city":{"cf":"personal data", "col":"city", "type":"string"}, "empName":{"cf":"personal data", "col":"name", "type":"string"}, "jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"}, "salary":{"cf":"professional data", "col":"salary", "type":"string"} } }""".stripMargin
Perform DataFrame operation on top of HBase table, First we define and then load data into Dataframe.
scala> def withCatalog(empcatalog: String): DataFrame = { sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->empcatalog)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() } withCatalog: (empcatalog: String)org.apache.spark.sql.DataFrame scala> val df = withCatalog(empcatalog) df: org.apache.spark.sql.DataFrame = [city: string, empName: string, jobDesignation: string, salary: string, empNumber: string] scala> df.show 17/11/08 18:04:22 INFO RecoverableZooKeeper: Process identifier=hconnection-0x55a690be connecting to ZooKeeper ensemble=vb-atlas-node1.hortonworks.com:2181,vb-atlas-node2.hortonworks.com:2181,vb-atlas-ambari.hortonworks.com:2181 17/11/08 18:04:22 INFO ZooKeeper: Client environment:zookeeper.version=3.4.6-8--1, built on 04/01/201 . . . 17/11/08 18:04:24 INFO DAGScheduler: ResultStage 0 (show at <console>:39) finished in 1.011 s 17/11/08 18:04:24 INFO DAGScheduler: Job 0 finished: show at <console>:39, took 1.230151 s +---------+-------+--------------+------+---------+ | city|empName|jobDesignation|salary|empNumber| +---------+-------+--------------+------+---------+ | chennai| ravi| manager| 50000| 1| |hyderabad| raju| engineer| null| 2| | delhi| rajesh| jrenginner| null| 3| +---------+-------+--------------+------+---------+
We can query using sqlContext on the dataframe.
scala> df.registerTempTable("table") scala>sqlContext.sql("select empNumber,jobDesignation from table").show +---------+--------------+ |empNumber|jobDesignation| +---------+--------------+ | 1| manager| | 2| engineer| | 3| jrenginner| +---------+--------------+
Reference :
https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
Created on 07-19-2018 12:09 PM
When running the command:
scala> val df = withCatalog(empcatalog)
I have the following problem:
java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) at java.lang.Class.getConstructor0(Class.java:3075) at java.lang.Class.newInstance(Class.java:412) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125) at withCatalog(<console>:32) ... 52 elided Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 71 more
I think the problem is I'm using Spark 2.1.0 and it no longer has org.apache.spark.logging. Can you please advise on how to resolve this issue without downgrading Spark version?
Thank you very much!
Created on 07-18-2019 04:36 PM
I did the following.
I created a symlink for hbase-site.xml on each edge node as follows.
ln -s /etc/hbase/conf/hbase-site.xml /etc/spark2/conf/hbase-site.xml
I started spark shell with the following params.
spark-shell --driver-memory 5g --jars /usr/hdp/2.6.5.0-292/shc/shc-core-1.1.0.2.6.5.0-292.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar