Posts: 25
Registered: ‎12-10-2014

About a problem trying to run a direct spark streaming job using sat-pack plugin within a cdh-5.7.0

Hi everyone, I have running a vmware cdh 5.7.0 with centos 6.8 image in my osx laptop, the image uses 4 cores and 8 GB of RAM, i want to run a direct spark streaming job connected to a kafka which is running within the osx host. I know that YARN is necessary to run the code within the pseudo distributed cdh 5.7.0 image and i want to use sbt-pack to create the unix command. Someone on the spark user list told me to delete the setMaster line in order to let YARN to schedule the necessary resources in order to run the process, because if i use this config:


 val sparkConf = new SparkConf(false).setAppName("AmazonKafkaConnector")
                                   .set("spark.cores.max", "2")
                                   .set("spark.driver.allowMultipleContexts", "true")

    val sc = new SparkContext(sparkConf)

I got this warning:


16/06/04 17:03:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on (size: 2.6 KB, free: 945.5 MB)
16/06/04 17:03:42 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/04 17:03:42 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at groupBy at Recommender.scala:28)
16/06/04 17:03:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/06/04 17:03:57 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/06/04 17:04:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

the process does not work because of resources.


But if i comment the setMaster line, i got this one:


org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:402)
at example.spark.AmazonKafkaConnector$.main(AmazonKafkaConnectorWithMongo.scala:93)
at example.spark.AmazonKafkaConnector.main(AmazonKafkaConnectorWithMongo.scala)


When i see spark-master page, this is what i see:



Spark Master at spark://
URL: spark://
REST URL: spark:// (cluster mode)
Alive Workers: 0
Cores in use: 0 Total, 0 Used
Memory in use: 0.0 B Total, 0.0 B Used
Applications: 2 Running, 2 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Worker Id Address State Cores Memory
Running Applications
Application ID Name Cores Memory per Node Submitted Time User State Duration
AmazonKafkaConnector 0 1024.0 MB 2016/06/04 17:06:43 cloudera WAITING 23 s
AmazonKafkaConnector 0 1024.0 MB 2016/06/04 17:06:42 cloudera WAITING 23 s

And this is what i see within spark-worker page:


Spark Worker at

    ID: worker-20160604170606-
    Master URL:
    Cores: 4 (0 Used)
    Memory: 6.7 GB (0.0 B Used)

Back to Master
Running Executors (0)
ExecutorID	Cores	State	Memory	Job Details	Logs

I am using 1.6.0-cdh5.7.0 spark version, the exact same version than my cloudera image.


This is github project, the develop tree, my question is, ¿how can i use sbt-pack to run programmatically this spark streaming process?, by the way, the code of the process looks like this:


//This is part of AmazonKafkaConnectorWithMongo
val sparkConf = new SparkConf(false).setAppName("AmazonKafkaConnector")
//Commented or decommented, i cannot have this code up and running within the cloudera image
                                   .set("spark.cores.max", "2")
                                   .set("spark.driver.allowMultipleContexts", "true")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    //this checkpointdir should be in a conf file, for now it is hardcoded!
    val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    println("Initialized Streaming Spark Context and kafka connector...")
    //create recomendation module
    println("Creating rating recommender module...")
    val ratingFile= "hdfs://"
    val recommender = new Recommender(sc,ratingFile)

This is part of Recommender class:

val NumRecommendations = 10
  val MinRecommendationsPerUser = 10
  val MaxRecommendationsPerUser = 20
  val MyUsername = "myself"
  val NumPartitions = 20

  @transient val random = new Random() with Serializable

  println("Using this ratingFile: " + ratingFile)
  // first create an RDD out of the rating file
  val rawTrainingRatings = sc.textFile(ratingFile).map {
    line =>
      val Array(userId, productId, scoreStr) = line.split(",")
      AmazonRating(userId, productId, scoreStr.toDouble)

  // only keep users that have rated between MinRecommendationsPerUser and 
MaxRecommendationsPerUser products
  val trainingRatings = rawTrainingRatings.groupBy(_.userId)
                                          .filter(r => MinRecommendationsPerUser <= r._2.size  && r._2.size < MaxRecommendationsPerUser)

  println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}")

I already test this code within a spark scala shell and it works!, but it does not work when running using sbt-pack unix command.


Please help, i am stuck with this issue during the last week, the file that i am trying to use as ratingFile is only 16MB, it cannot be a matter of provided resources, isnt ?


Thank yoy for reading until here.


Posts: 25
Registered: ‎12-10-2014

Re: About a problem trying to run a direct spark streaming job using sat-pack plugin within a cdh-5.

Hi, I just wanted to add more information about the problem that I have, please go to

The real problem is to get the same result than i got within the spark-shell, i mean, when i run the code using setMaster(local[4]) and ratingFile is set to "file:///home/cloudera/Downloads/ratings.csv", i am getting this message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 0 ratings out of 568454

because if i run the exact code in the spark-shell, i got this message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 73279 ratings out of 568454

If i use setMaster("local[4]"), it does not matter if i use the /home/cloudera/Download/ratings.csv or the one loaded within the hdfs, i am always getting the message:

Parsed file:///home/cloudera/Downloads/ratings.csv. Kept 0 ratings out of 568454

I thought that using the code submitting the job to the pseudo cluster, the problem will gone, but it isn't.

Please help. How can i get this spark job up and running properly (with the same results than i am getting using the spark-shell) in the pseudo distributed cdh 5.7.0 image? ¿How can i use YARN programmatically using sbt-pack?