Support Questions

Find answers, ask questions, and share your expertise

[Spark Streaming] UnionDStream does not produce batches

avatar

Hello,

 

I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it.  I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.

 

[Receiver Class]

import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
override def onStart() {
val factory = new ConnectionFactory()
factory.setUsername("myusername")
factory.setPassword("mypassword")
factory.setVirtualHost("/")
factory.setHost("10.0.0.6") //IP of the internal load balancer
factory.setPort(5673)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("MDM_Smart_Metering", false, consumer)
new Thread("RabbitMQ Receiver") {
override def run() {
while (!isStopped) {
var delivery = consumer.nextDelivery()
var message = fromBytes(delivery.getBody())
store(message)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
}
}
}.start()
}
override def onStop() {}
}

 

[Streaming Code]

val conf = new SparkConf().setAppName("myAppName")
val ssc = new StreamingContext(conf, Seconds(1))
val numStreams = args(0).toInt
val numberOfPartitions = args(1).toInt
val nameNode = args(2)
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)

unifiedStream.print()

ssc.start()

ssc.awaitTermination()

1 ACCEPTED SOLUTION

avatar

I was finally able to get it to work by setting the value of --executor-cores 12  in spark-submit options. 

View solution in original post

15 REPLIES 15

avatar

It says Executors (3)

 

I also checked the dynamic resource pool on yarn. Resource pool usage shows that allocated cores is 3 and allocated containers is 3.

avatar
Master Collaborator

So, your app only has 3 cores from YARN? then your app can only be executing 3 tasks in parallel. I'm not sure how many receivers you are starting, but is that less?

 

It sounds like you expected much more resource to be avialable, so I'd go look at your YARN config and what's using the resource and compare to what Spark is actually requesting.

avatar

It is request 9 but only 3 get allocated. What are the configuration keys that need to be changed? I searched for all keys that end with "vcores" and increased them but still only 3 executors get allocated

avatar

Here are the configuration keys that I changed:

 

yarn.nodemanager.resource.cpu-vcores -> 8

yarn.scheduler.maximum-allocation-vcores -> 32

 

Still I only get 3 executors in Spark UI.

avatar

I was finally able to get it to work by setting the value of --executor-cores 12  in spark-submit options. 

avatar

Thanks sowen for your cooperation