Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

About a problem trying to run a direct spark streaming job using sat-pack plugin within a cdh-5.7.0

About a problem trying to run a direct spark streaming job using sat-pack plugin within a cdh-5.7.0

Explorer

Hi everyone, I have running a vmware cdh 5.7.0 with centos 6.8 image in my osx laptop, the image uses 4 cores and 8 GB of RAM, i want to run a direct spark streaming job connected to a kafka which is running within the osx host. I know that YARN is necessary to run the code within the pseudo distributed cdh 5.7.0 image and i want to use sbt-pack to create the unix command. Someone on the spark user list told me to delete the setMaster line in order to let YARN to schedule the necessary resources in order to run the process, because if i use this config:

 

 val sparkConf = new SparkConf(false).setAppName("AmazonKafkaConnector")
                                   //.setMaster("local[4]")
                                   .setMaster("spark://192.168.30.137:7077")
                                   .set("spark.cores.max", "2")
                                   .set("spark.driver.allowMultipleContexts", "true")

    val sc = new SparkContext(sparkConf)

I got this warning:

 

16/06/04 17:03:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.30.137:59231 (size: 2.6 KB, free: 945.5 MB)
16/06/04 17:03:42 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/04 17:03:42 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at groupBy at Recommender.scala:28)
16/06/04 17:03:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/06/04 17:03:57 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/06/04 17:04:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

the process does not work because of resources.

 

But if i comment the setMaster line, i got this one:

 

org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:402)
at example.spark.AmazonKafkaConnector$.main(AmazonKafkaConnectorWithMongo.scala:93)
at example.spark.AmazonKafkaConnector.main(AmazonKafkaConnectorWithMongo.scala)

 

When i see spark-master page, this is what i see:

 

 

Spark Master at spark://192.168.30.137:7077
URL: spark://192.168.30.137:7077
REST URL: spark://192.168.30.137:6066 (cluster mode)
Alive Workers: 0
Cores in use: 0 Total, 0 Used
Memory in use: 0.0 B Total, 0.0 B Used
Applications: 2 Running, 2 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Workers
Worker Id Address State Cores Memory
Running Applications
Application ID Name Cores Memory per Node Submitted Time User State Duration
app-20160604170643-0003
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/04 17:06:43 cloudera WAITING 23 s
app-20160604170642-0002
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/04 17:06:42 cloudera WAITING 23 s

And this is what i see within spark-worker page:

 

Spark Worker at 192.168.30.137:7078

    ID: worker-20160604170606-192.168.30.137-7078
    Master URL:
    Cores: 4 (0 Used)
    Memory: 6.7 GB (0.0 B Used)

Back to Master
Running Executors (0)
ExecutorID	Cores	State	Memory	Job Details	Logs

I am using 1.6.0-cdh5.7.0 spark version, the exact same version than my cloudera image.

 

This is github project, the develop tree, my question is, ¿how can i use sbt-pack to run programmatically this spark streaming process?, by the way, the code of the process looks like this:

 

//This is part of AmazonKafkaConnectorWithMongo
val sparkConf = new SparkConf(false).setAppName("AmazonKafkaConnector")
                                   //.setMaster("local[4]")
//Commented or decommented, i cannot have this code up and running within the cloudera image
                                   .setMaster("spark://192.168.30.137:7077")
                                   .set("spark.cores.max", "2")
                                   .set("spark.driver.allowMultipleContexts", "true")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    //this checkpointdir should be in a conf file, for now it is hardcoded!
    val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
    ssc.checkpoint(streamingCheckpointDir)
      
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    println("Initialized Streaming Spark Context and kafka connector...")
    
    //create recomendation module
    println("Creating rating recommender module...")
    val ratingFile= "hdfs://192.168.30.137:8020/user/cloudera/ratings.csv"
    val recommender = new Recommender(sc,ratingFile)
...

This is part of Recommender class:

val NumRecommendations = 10
  val MinRecommendationsPerUser = 10
  val MaxRecommendationsPerUser = 20
  val MyUsername = "myself"
  val NumPartitions = 20

  @transient val random = new Random() with Serializable

  println("Using this ratingFile: " + ratingFile)
  // first create an RDD out of the rating file
  val rawTrainingRatings = sc.textFile(ratingFile).map {
    line =>
      val Array(userId, productId, scoreStr) = line.split(",")
      AmazonRating(userId, productId, scoreStr.toDouble)
  }

  // only keep users that have rated between MinRecommendationsPerUser and 
//THIS IS WHAT PROVOKES THE WARNING ABOUT LACK OF RESOURCES!
MaxRecommendationsPerUser products
  val trainingRatings = rawTrainingRatings.groupBy(_.userId)
                                          .filter(r => MinRecommendationsPerUser <= r._2.size  && r._2.size < MaxRecommendationsPerUser)
                                          .flatMap(_._2)
                                          .repartition(NumPartitions)
                                          .cache()

  println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}")

I already test this code within a spark scala shell and it works!, but it does not work when running using sbt-pack unix command.

 

Please help, i am stuck with this issue during the last week, the file that i am trying to use as ratingFile is only 16MB, it cannot be a matter of provided resources, isnt ?

 

Thank yoy for reading until here.

 

1 REPLY 1
Highlighted

Re: About a problem trying to run a direct spark streaming job using sat-pack plugin within a cdh-5.

Explorer
Hi, I just wanted to add more information about the problem that I have, please go to

http://community.cloudera.com/t5/Hadoop-101-Training-Quickstart/I-cannot-access-programmatically-a-f...

The real problem is to get the same result than i got within the spark-shell, i mean, when i run the code using setMaster(local[4]) and ratingFile is set to "file:///home/cloudera/Downloads/ratings.csv", i am getting this message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 0 ratings out of 568454

because if i run the exact code in the spark-shell, i got this message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 73279 ratings out of 568454

If i use setMaster("local[4]"), it does not matter if i use the /home/cloudera/Download/ratings.csv or the one loaded within the hdfs, i am always getting the message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 0 ratings out of 568454

I thought that using the code submitting the job to the pseudo cluster, the problem will gone, but it isn't.

Please help. How can i get this spark job up and running properly (with the same results than i am getting using the spark-shell) in the pseudo distributed cdh 5.7.0 image? ¿How can i use YARN programmatically using sbt-pack?