Member since
02-22-2015
17
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5226 | 02-22-2015 10:00 AM |
02-22-2015
10:01 AM
Thanks sowen for your cooperation
... View more
02-22-2015
10:00 AM
I was finally able to get it to work by setting the value of --executor-cores 12 in spark-submit options.
... View more
02-22-2015
09:12 AM
Here are the configuration keys that I changed: yarn.nodemanager.resource.cpu-vcores -> 8 yarn.scheduler.maximum-allocation-vcores -> 32 Still I only get 3 executors in Spark UI.
... View more
02-22-2015
09:03 AM
It is request 9 but only 3 get allocated. What are the configuration keys that need to be changed? I searched for all keys that end with "vcores" and increased them but still only 3 executors get allocated
... View more
02-22-2015
08:38 AM
It says Executors (3) I also checked the dynamic resource pool on yarn. Resource pool usage shows that allocated cores is 3 and allocated containers is 3.
... View more
02-22-2015
08:22 AM
Where do I find the executors tab?
... View more
02-22-2015
08:04 AM
I tried the below code and it worked for up to 5 receivers in parallel. My I know how do you count the minimum required cores for a task? MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell --jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.StreamingContext._ import com.mongodb.casbah.Imports._ import com.mongodb.DBObject import com.mongodb.BasicDBObject import com.mongodb.BasicDBList import com.mongodb.util.JSON import com.mongodb.BulkWriteOperation import java.util.Date import java.text.SimpleDateFormat import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer } import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.Logging import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging { def fromBytes(x: Array[Byte]) = new String(x, "UTF-8") override def onStart() { val factory = new ConnectionFactory() factory.setUsername("myusername") factory.setPassword("mypassword") factory.setVirtualHost("/") factory.setHost("10.0.0.6") //IP of the internal load balancer factory.setPort(5673) val connection = factory.newConnection() val channel = connection.createChannel() channel.queueDeclare("MDM_Smart_Metering", true, false, false, null) val consumer = new QueueingConsumer(channel) channel.basicConsume("MDM_Smart_Metering", false, consumer) new Thread("RabbitMQ Receiver") { override def run() { while (!isStopped) { var delivery = consumer.nextDelivery() var message = fromBytes(delivery.getBody()) store(message) channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) } } }.start() } override def onStop() {} } val ssc = new StreamingContext(sc, Seconds(1)) val numStreams = 5 val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) } val unifiedStream = ssc.union(rabbitStreams) unifiedStream.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print() ssc.start() Thread.sleep(10 * 1000) ssc.stop(true, true)
... View more
02-22-2015
07:35 AM
I tried the below code on spark shell and it worked (it basically read the same files 3 times) import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(1)) val streams = (1 to 3).map { i => ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") } val lines = ssc.union(streams) //ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") // lines.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print() val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Thread.sleep(30 * 1000) ssc.stop(true, true) This is the command i used to run spark shell: MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell
... View more
02-22-2015
05:20 AM
This is my run-spark-submit.sh file #!/bin/bash source /etc/spark/conf/spark-env.sh spark-submit \ --class "com.itworx.smartmetering.SmartMeteringJob" \ --deploy-mode cluster \ --master yarn \ --jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar \ --name "SmartMeteringJob" \ --num-executors 8 \ --conf spark.executor.memory=400m \ smartmeteringjob_2.10-1.2.4-SNAPSHOT.jar 1 4 10.0.0.7 I have 3 nodes (one name node and 2 data nodes) each of the datanodes has 4 cores. I have set the number of executors to 8.
... View more
02-22-2015
04:54 AM
Master is yarn Deploy mode is cluster Available cores 8 I set number of partitions to 1 to free cores.
... View more