Created on 02-22-2015 04:15 AM
I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it. I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.
import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
override def onStart() {
val factory = new ConnectionFactory()
factory.setHost("") //IP of the internal load balancer
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("MDM_Smart_Metering", false, consumer)
new Thread("RabbitMQ Receiver") {
override def run() {
while (!isStopped) {
var delivery = consumer.nextDelivery()
var message = fromBytes(delivery.getBody())
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
override def onStop() {}
val conf = new SparkConf().setAppName("myAppName")
val ssc = new StreamingContext(conf, Seconds(1))
val numStreams = args(0).toInt
val numberOfPartitions = args(1).toInt
val nameNode = args(2)
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)
Created 02-22-2015 10:00 AM
I was finally able to get it to work by setting the value of --executor-cores 12 in spark-submit options.
Created 02-22-2015 04:45 AM
What is your master set to? It needs to allow for all the receivers, plus one, IIRC.
Created 02-22-2015 04:54 AM
Created 02-22-2015 05:04 AM
OK, what I'm interested in is how many executor slots you have. How many machines, how many executors, how many cores per executor? we want to confirm it's at least as many as the number of receivers.
what about a simpler test involving a file-based DStream? if that works then it rules out much except the custom DStream.
Created 02-22-2015 05:20 AM
This is my file
source /etc/spark/conf/
spark-submit \
--class "com.itworx.smartmetering.SmartMeteringJob" \
--deploy-mode cluster \
--master yarn \
--jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar \
--name "SmartMeteringJob" \
--num-executors 8 \
--conf spark.executor.memory=400m \
smartmeteringjob_2.10-1.2.4-SNAPSHOT.jar 1 4
I have 3 nodes (one name node and 2 data nodes) each of the datanodes has 4 cores. I have set the number of executors to 8.
Created 02-22-2015 08:11 AM
You usually use --executory-memory to set executor memory but I don't think it matters. You also generally do not use env variables to configure spark-shell. Although it might be giving the desird results, i'd use standard command line flags.
It sounds like simpler jobs are working. While you request 8 executors do you actually get them from YARN? go look at your executors tab.
Created 02-22-2015 07:35 AM
I tried the below code on spark shell and it worked (it basically read the same files 3 times)
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(1))
val streams = (1 to 3).map { i => ssc.textFileStream("hdfs://") }
val lines = ssc.union(streams) //ssc.textFileStream("hdfs://") //
lines.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()
val words = lines.flatMap(_.split(" "))
val wordCounts = => (x, 1)).reduceByKey(_ + _)
Thread.sleep(30 * 1000)
ssc.stop(true, true)
This is the command i used to run spark shell:
Created 02-22-2015 08:04 AM
I tried the below code and it worked for up to 5 receivers in parallel. My I know how do you count the minimum required cores for a task?
MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell --jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext._
import com.mongodb.casbah.Imports._
import com.mongodb.DBObject
import com.mongodb.BasicDBObject
import com.mongodb.BasicDBList
import com.mongodb.util.JSON
import com.mongodb.BulkWriteOperation
import java.util.Date
import java.text.SimpleDateFormat
import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
val ssc = new StreamingContext(sc, Seconds(1))
val numStreams = 5
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)
unifiedStream.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()
Thread.sleep(10 * 1000)
ssc.stop(true, true)
Created 02-22-2015 08:22 AM
Where do I find the executors tab?
Created 02-22-2015 08:29 AM
Go the the Spark UI and look at the top of the screen -- click Executors