Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.

[CDH 5.3] Spark -hive integration issue

SOLVED Go to solution

[CDH 5.3] Spark -hive integration issue

Expert Contributor

i am trying to use hive from spark ,to do word count in a flume stream then save it in hive database, so simply as a start i want to create a table if it doesn't exist , but i am getting error :

 

 

 

2015-02-23 09:58:16,717 INFO [main] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Registered BlockManager
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
at WordCount$.main(WordCount.scala:46)
at WordCount.main(WordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 9 more

 

 

which is here 

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 

 

My Code is :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hive._
import org.apache.spark.sql.hive.HiveContext


/**
 * @author tabouzaid
 */
object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: WordCount <host> <port>")
      System.exit(1)
    }
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    val sparkConf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, batchInterval)
    val stream = FlumeUtils.createStream(ssc, host, port.toInt)
    stream.count().map(cnt => "Received !!!:::::" + cnt + " flume events." ).print()
    val body = stream.map(e => new String(e.event.getBody.array))
   val counts = body.flatMap(line => line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    ssc.start()
    ssc.awaitTermination()
  }
}

 my sbt to build this project :

 

name := "WordCount"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"

 

the run command is :

 

sudo spark-submit --class "WordCount" --master local[*] --jars /usr/local/WordCount/target/scala-2.10/spark-streaming-flume_2.11-1.2.0.jar,/usr/lib/avro/avro-ipc-1.7.6-cdh5.3.0.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/usr/lib/hive/lib/hive-common-0.13.1-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-hive_2.10-1.2.0-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-sql_2.10-1.2.0.jar  /usr/local/WordCount/target/scala-2.10/wordcount_2.10-1.0.jar 127.0.0.1 9999

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Re: [CDH 5.3] Spark -hive integration issue

Master Collaborator

This is answered a few times already here. Have a look at for example http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/I-am-using-a-hive-cotext-in-pyspark...

 

The short answer is that Spark is not entirely compatible with recent versions of Hive found in CDH, but may still work for a lot of use cases. The Spark bits are still there. You have to add Hive to the classpath yourself.

4 REPLIES 4

Re: [CDH 5.3] Spark -hive integration issue

Explorer

I have exactly the same problem in CDH 5.2.1

 

Regards,

Re: [CDH 5.3] Spark -hive integration issue

Master Collaborator

This is answered a few times already here. Have a look at for example http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/I-am-using-a-hive-cotext-in-pyspark...

 

The short answer is that Spark is not entirely compatible with recent versions of Hive found in CDH, but may still work for a lot of use cases. The Spark bits are still there. You have to add Hive to the classpath yourself.

Highlighted

Re: [CDH 5.3] Spark -hive integration issue

New Contributor
Hi, do you have to add hive to the classpath on every node ?

Re: [CDH 5.3] Spark -hive integration issue

Expert Contributor
yes you have to