Created on 02-22-2015 04:15 AM - edited 09-16-2022 02:22 AM
Hello,
I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it. I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.
[Receiver Class]
import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
override def onStart() {
val factory = new ConnectionFactory()
factory.setUsername("myusername")
factory.setPassword("mypassword")
factory.setVirtualHost("/")
factory.setHost("10.0.0.6") //IP of the internal load balancer
factory.setPort(5673)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("MDM_Smart_Metering", false, consumer)
new Thread("RabbitMQ Receiver") {
override def run() {
while (!isStopped) {
var delivery = consumer.nextDelivery()
var message = fromBytes(delivery.getBody())
store(message)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
}
}
}.start()
}
override def onStop() {}
}
[Streaming Code]
val conf = new SparkConf().setAppName("myAppName")
val ssc = new StreamingContext(conf, Seconds(1))
val numStreams = args(0).toInt
val numberOfPartitions = args(1).toInt
val nameNode = args(2)
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)
unifiedStream.print()
ssc.start()
ssc.awaitTermination()
Created 02-22-2015 10:00 AM
I was finally able to get it to work by setting the value of --executor-cores 12 in spark-submit options.
Created 02-22-2015 08:38 AM
It says Executors (3)
I also checked the dynamic resource pool on yarn. Resource pool usage shows that allocated cores is 3 and allocated containers is 3.
Created 02-22-2015 08:52 AM
So, your app only has 3 cores from YARN? then your app can only be executing 3 tasks in parallel. I'm not sure how many receivers you are starting, but is that less?
It sounds like you expected much more resource to be avialable, so I'd go look at your YARN config and what's using the resource and compare to what Spark is actually requesting.
Created 02-22-2015 09:03 AM
It is request 9 but only 3 get allocated. What are the configuration keys that need to be changed? I searched for all keys that end with "vcores" and increased them but still only 3 executors get allocated
Created 02-22-2015 09:12 AM
Here are the configuration keys that I changed:
yarn.nodemanager.resource.cpu-vcores -> 8
yarn.scheduler.maximum-allocation-vcores -> 32
Still I only get 3 executors in Spark UI.
Created 02-22-2015 10:00 AM
I was finally able to get it to work by setting the value of --executor-cores 12 in spark-submit options.
Created 02-22-2015 10:01 AM
Thanks sowen for your cooperation