Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Who agreed with this topic

[CDH 5.3] Spark -hive integration issue

avatar
Expert Contributor

i am trying to use hive from spark ,to do word count in a flume stream then save it in hive database, so simply as a start i want to create a table if it doesn't exist , but i am getting error :

 

 

 

2015-02-23 09:58:16,717 INFO [main] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Registered BlockManager
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
at WordCount$.main(WordCount.scala:46)
at WordCount.main(WordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 9 more

 

 

which is here 

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 

 

My Code is :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hive._
import org.apache.spark.sql.hive.HiveContext


/**
 * @author tabouzaid
 */
object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: WordCount <host> <port>")
      System.exit(1)
    }
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    val sparkConf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, batchInterval)
    val stream = FlumeUtils.createStream(ssc, host, port.toInt)
    stream.count().map(cnt => "Received !!!:::::" + cnt + " flume events." ).print()
    val body = stream.map(e => new String(e.event.getBody.array))
   val counts = body.flatMap(line => line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    ssc.start()
    ssc.awaitTermination()
  }
}

 my sbt to build this project :

 

name := "WordCount"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"

 

the run command is :

 

sudo spark-submit --class "WordCount" --master local[*] --jars /usr/local/WordCount/target/scala-2.10/spark-streaming-flume_2.11-1.2.0.jar,/usr/lib/avro/avro-ipc-1.7.6-cdh5.3.0.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/usr/lib/hive/lib/hive-common-0.13.1-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-hive_2.10-1.2.0-cdh5.3.0.jar,/usr/local/WordCount/target/scala-2.10/spark-sql_2.10-1.2.0.jar  /usr/local/WordCount/target/scala-2.10/wordcount_2.10-1.0.jar 127.0.0.1 9999

 

 

 

Who agreed with this topic