Support Questions

Find answers, ask questions, and share your expertise

[CDH 5.3] Spark -hive integration issue

avatar
Expert Contributor

i am trying to use hive from spark ,to do word count in a flume stream then save it in hive database, so simply as a start i want to create a table if it doesn't exist , but i am getting error :

 

 

 

2015-02-23 09:58:16,717 INFO [main] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Registered BlockManager
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
at WordCount$.main(WordCount.scala:46)
at WordCount.main(WordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 9 more

 

 

which is here 

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 

 

My Code is :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hive._
import org.apache.spark.sql.hive.HiveContext


/**
 * @author tabouzaid
 */
object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: WordCount <host> <port>")
      System.exit(1)
    }
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    val sparkConf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, batchInterval)
    val stream = FlumeUtils.createStream(ssc, host, port.toInt)
    stream.count().map(cnt => "Received !!!:::::" + cnt + " flume events." ).print()
    val body = stream.map(e => new String(e.event.getBody.array))
   val counts = body.flatMap(line => line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    ssc.start()
    ssc.awaitTermination()
  }
}

 my sbt to build this project :

 

name := "WordCount"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"

 

the run command is :

 

sudo spark-submit --class "WordCount" --master local[*] --jars /usr/local/WordCount/target/scala-2.10/spark-streaming-flume_2.11-1.2.0.jar,/usr/lib/avro/avro-ipc-1.7.6-cdh5.3.0.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/usr/lib/hive/lib/hive-common-0.13.1-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-hive_2.10-1.2.0-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-sql_2.10-1.2.0.jar  /usr/local/WordCount/target/scala-2.10/wordcount_2.10-1.0.jar 127.0.0.1 9999

 

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

This is answered a few times already here. Have a look at for example http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/I-am-using-a-hive-cotext-in-pyspark...

 

The short answer is that Spark is not entirely compatible with recent versions of Hive found in CDH, but may still work for a lot of use cases. The Spark bits are still there. You have to add Hive to the classpath yourself.

View solution in original post

4 REPLIES 4

avatar
Contributor

I have exactly the same problem in CDH 5.2.1

 

Regards,

avatar
Master Collaborator

This is answered a few times already here. Have a look at for example http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/I-am-using-a-hive-cotext-in-pyspark...

 

The short answer is that Spark is not entirely compatible with recent versions of Hive found in CDH, but may still work for a lot of use cases. The Spark bits are still there. You have to add Hive to the classpath yourself.

avatar
Contributor
Hi, do you have to add hive to the classpath on every node ?

avatar
Expert Contributor
yes you have to