Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

[Spark Streaming] UnionDStream does not produce batches

avatar

Hello,

 

I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it.  I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.

 

[Receiver Class]

import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
override def onStart() {
val factory = new ConnectionFactory()
factory.setUsername("myusername")
factory.setPassword("mypassword")
factory.setVirtualHost("/")
factory.setHost("10.0.0.6") //IP of the internal load balancer
factory.setPort(5673)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("MDM_Smart_Metering", false, consumer)
new Thread("RabbitMQ Receiver") {
override def run() {
while (!isStopped) {
var delivery = consumer.nextDelivery()
var message = fromBytes(delivery.getBody())
store(message)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
}
}
}.start()
}
override def onStop() {}
}

 

[Streaming Code]

val conf = new SparkConf().setAppName("myAppName")
val ssc = new StreamingContext(conf, Seconds(1))
val numStreams = args(0).toInt
val numberOfPartitions = args(1).toInt
val nameNode = args(2)
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)

unifiedStream.print()

ssc.start()

ssc.awaitTermination()

1 ACCEPTED SOLUTION

avatar

I was finally able to get it to work by setting the value of --executor-cores 12  in spark-submit options. 

View solution in original post

15 REPLIES 15

avatar
Master Collaborator

What is your master set to? It  needs to allow for all the receivers, plus one, IIRC.

avatar
Master is yarn
Deploy mode is cluster
Available cores 8
I set number of partitions to 1 to free cores.

avatar
Master Collaborator

OK, what I'm interested in is how many executor slots you have. How many machines, how many executors, how many cores per executor? we want to confirm it's at least as many as the number of receivers.

 

what about a simpler test involving a file-based DStream? if that works then it rules out much except the custom DStream.

avatar

This is my run-spark-submit.sh file

 

#!/bin/bash
source /etc/spark/conf/spark-env.sh
spark-submit \
--class "com.itworx.smartmetering.SmartMeteringJob" \
--deploy-mode cluster \
--master yarn \
--jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar \
--name "SmartMeteringJob" \
--num-executors 8 \
--conf spark.executor.memory=400m \
smartmeteringjob_2.10-1.2.4-SNAPSHOT.jar 1 4 10.0.0.7

 

I have 3 nodes (one name node and 2 data nodes) each of the datanodes has 4 cores. I have set the number of executors to 8. 

avatar
Master Collaborator

You usually use --executory-memory to set executor memory but I don't think it matters.  You also generally do not use env variables to configure spark-shell. Although it might be giving the desird results, i'd use standard command line flags.

 

It sounds like simpler jobs are working. While you request 8 executors do you actually get them from YARN? go look at your executors tab.

 

avatar

I tried the below code  on spark shell and it worked (it basically read the same files 3 times)

 

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(1))
val streams = (1 to 3).map { i => ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") }
val lines = ssc.union(streams) //ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") //
lines.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
Thread.sleep(30 * 1000)
ssc.stop(true, true)

 

 

This is the command i used to run spark shell:

MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell

avatar

I tried the below code and it worked for up to 5 receivers in parallel. My I know how do you count the minimum required cores for a task?

 

MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell --jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar

 

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext._
import com.mongodb.casbah.Imports._
import com.mongodb.DBObject
import com.mongodb.BasicDBObject
import com.mongodb.BasicDBList
import com.mongodb.util.JSON
import com.mongodb.BulkWriteOperation
import java.util.Date
import java.text.SimpleDateFormat
import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
override def onStart() {
val factory = new ConnectionFactory()
factory.setUsername("myusername")
factory.setPassword("mypassword")
factory.setVirtualHost("/")
factory.setHost("10.0.0.6") //IP of the internal load balancer
factory.setPort(5673)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("MDM_Smart_Metering", false, consumer)
new Thread("RabbitMQ Receiver") {
override def run() {
while (!isStopped) {
var delivery = consumer.nextDelivery()
var message = fromBytes(delivery.getBody())
store(message)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
}
}
}.start()
}
override def onStop() {}
}
val ssc = new StreamingContext(sc, Seconds(1))
val numStreams = 5
val rabbitStreams = (1 to numStreams).map { i => ssc.receiverStream[String](new RMQReceiver()) }
val unifiedStream = ssc.union(rabbitStreams)
unifiedStream.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()
ssc.start()
Thread.sleep(10 * 1000)
ssc.stop(true, true)

avatar

Where do I find the executors tab?

avatar
Master Collaborator

Go the the Spark UI and look at the top of the screen -- click Executors