<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question [Spark Streaming] UnionDStream does not produce batches in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24908#M5058</link>
    <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it. &amp;nbsp;I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;[Receiver Class]&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.streaming.receiver.Receiver&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.Logging&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import scala.reflect.ClassTag&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.storage.StorageLevel&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def onStart() {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val factory = new ConnectionFactory()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setUsername("myusername")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setPassword("mypassword")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setVirtualHost("/")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setHost("10.0.0.6") //IP of the internal load balancer&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setPort(5673)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val connection = factory.newConnection()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val channel = connection.createChannel()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val consumer = new QueueingConsumer(channel)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.basicConsume("MDM_Smart_Metering", false, consumer)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;new Thread("RabbitMQ Receiver") {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def run() {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;while (!isStopped) {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;var delivery = consumer.nextDelivery()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;var message = fromBytes(delivery.getBody())&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;store(message)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}.start()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def onStop() {}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;[Streaming Code]&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;val conf = new SparkConf().setAppName("myAppName")&lt;BR /&gt;val ssc = new StreamingContext(conf, Seconds(1))&lt;BR /&gt;val numStreams = args(0).toInt&lt;BR /&gt;val numberOfPartitions = args(1).toInt&lt;BR /&gt;val nameNode = args(2)&lt;BR /&gt;val rabbitStreams = (1 to numStreams).map { i =&amp;gt; ssc.receiverStream[String](new RMQReceiver()) }&lt;BR /&gt;val unifiedStream = ssc.union(rabbitStreams)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;unifiedStream.print()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;ssc.start()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;ssc.awaitTermination()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 16 Sep 2022 09:22:14 GMT</pubDate>
    <dc:creator>youssefyoussef</dc:creator>
    <dc:date>2022-09-16T09:22:14Z</dc:date>
    <item>
      <title>[Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24908#M5058</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I implemented a custom receiver to consume messages from RabbitMQ. The application works fine when I instantiate one receiver but when I try to union multiple receivers the application blocks and does not proceed to the next stages in the pipeline. I noticed that the messages get consumed from the queue. The Application Master UI does not show any error. It shows the next stage in the pipeline but does not execute it. &amp;nbsp;I made sure I have enough cores for execution (8 cores). Here is the code below (the receiver class and the streaming code). How do I get my streaming application to consume from multiple receivers.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;[Receiver Class]&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.streaming.receiver.Receiver&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.Logging&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import scala.reflect.ClassTag&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;import org.apache.spark.storage.StorageLevel&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def onStart() {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val factory = new ConnectionFactory()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setUsername("myusername")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setPassword("mypassword")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setVirtualHost("/")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setHost("10.0.0.6") //IP of the internal load balancer&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;factory.setPort(5673)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val connection = factory.newConnection()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val channel = connection.createChannel()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;val consumer = new QueueingConsumer(channel)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.basicConsume("MDM_Smart_Metering", false, consumer)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;new Thread("RabbitMQ Receiver") {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def run() {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;while (!isStopped) {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;var delivery = consumer.nextDelivery()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;var message = fromBytes(delivery.getBody())&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;store(message)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}.start()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;override def onStop() {}&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="courier new,courier"&gt;}&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;[Streaming Code]&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;val conf = new SparkConf().setAppName("myAppName")&lt;BR /&gt;val ssc = new StreamingContext(conf, Seconds(1))&lt;BR /&gt;val numStreams = args(0).toInt&lt;BR /&gt;val numberOfPartitions = args(1).toInt&lt;BR /&gt;val nameNode = args(2)&lt;BR /&gt;val rabbitStreams = (1 to numStreams).map { i =&amp;gt; ssc.receiverStream[String](new RMQReceiver()) }&lt;BR /&gt;val unifiedStream = ssc.union(rabbitStreams)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;unifiedStream.print()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;ssc.start()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;ssc.awaitTermination()&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 09:22:14 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24908#M5058</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2022-09-16T09:22:14Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24910#M5059</link>
      <description>&lt;P&gt;What is your master set to? It &amp;nbsp;needs to allow for all the receivers, plus one, IIRC.&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 12:45:25 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24910#M5059</guid>
      <dc:creator>srowen</dc:creator>
      <dc:date>2015-02-22T12:45:25Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24911#M5060</link>
      <description>Master is yarn&lt;BR /&gt;Deploy mode is cluster&lt;BR /&gt;Available cores 8&lt;BR /&gt;I set number of partitions to 1 to free cores.&lt;BR /&gt;&lt;BR /&gt;</description>
      <pubDate>Sun, 22 Feb 2015 12:54:33 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24911#M5060</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T12:54:33Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24912#M5061</link>
      <description>&lt;P&gt;OK, what I'm interested in is how many executor slots you have. How many machines, how many executors, how many cores per executor? we want to confirm it's at least as many as the number of receivers.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;what about a simpler test involving a file-based DStream? if that works then it rules out much except the custom DStream.&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 13:04:31 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24912#M5061</guid>
      <dc:creator>srowen</dc:creator>
      <dc:date>2015-02-22T13:04:31Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24913#M5062</link>
      <description>&lt;P&gt;This is my run-spark-submit.sh file&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;#!/bin/bash&lt;BR /&gt;source /etc/spark/conf/spark-env.sh&lt;BR /&gt;spark-submit \&lt;BR /&gt;--class "com.itworx.smartmetering.SmartMeteringJob" \&lt;BR /&gt;--deploy-mode cluster \&lt;BR /&gt;--master yarn \&lt;BR /&gt;--jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar \&lt;BR /&gt;--name "SmartMeteringJob" \&lt;BR /&gt;--num-executors 8 \&lt;BR /&gt;--conf spark.executor.memory=400m \&lt;BR /&gt;smartmeteringjob_2.10-1.2.4-SNAPSHOT.jar 1 4 10.0.0.7&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have 3 nodes (one name node and 2 data nodes) each of the datanodes has 4 cores. I have set the number of executors to 8.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 13:20:51 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24913#M5062</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T13:20:51Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24914#M5063</link>
      <description>&lt;P&gt;I tried the below code &amp;nbsp;on spark shell and it worked (it basically read the same files 3 times)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;import org.apache.spark._&lt;BR /&gt;import org.apache.spark.streaming._&lt;BR /&gt;import org.apache.spark.streaming.dstream._&lt;BR /&gt;import org.apache.spark.streaming.StreamingContext._&lt;/P&gt;&lt;P&gt;val ssc = new StreamingContext(sc, Seconds(1))&lt;BR /&gt;val streams = (1 to 3).map { i =&amp;gt; ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") }&lt;BR /&gt;val lines = ssc.union(streams) //ssc.textFileStream("hdfs://10.0.0.7:8020/user/ubuntu/testStreaming") //&lt;BR /&gt;lines.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()&lt;BR /&gt;val words = lines.flatMap(_.split(" "))&lt;BR /&gt;val wordCounts = words.map(x =&amp;gt; (x, 1)).reduceByKey(_ + _)&lt;BR /&gt;wordCounts.print()&lt;/P&gt;&lt;P&gt;ssc.start()&lt;BR /&gt;Thread.sleep(30 * 1000)&lt;BR /&gt;ssc.stop(true, true)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is the command i used to run spark shell:&lt;/P&gt;&lt;P&gt;MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 15:35:08 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24914#M5063</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T15:35:08Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24916#M5064</link>
      <description>&lt;P&gt;I tried the below code and it worked for up to 5 receivers in parallel. My I know how do you count the minimum required cores for a task?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;MASTER=yarn-client SPARK_WORKER_MEMORY=400m SPARK_WORKER_CORES=3 SPARK_WORKER_INSTANCES=8 spark-shell --jars rabbitmq-client.jar,casbah-alldep_2.10-2.7.4.jar&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;import org.apache.spark._&lt;BR /&gt;import org.apache.spark.streaming._&lt;BR /&gt;import org.apache.spark.streaming.dstream._&lt;BR /&gt;import org.apache.spark.streaming.StreamingContext._&lt;BR /&gt;import com.mongodb.casbah.Imports._&lt;BR /&gt;import com.mongodb.DBObject&lt;BR /&gt;import com.mongodb.BasicDBObject&lt;BR /&gt;import com.mongodb.BasicDBList&lt;BR /&gt;import com.mongodb.util.JSON&lt;BR /&gt;import com.mongodb.BulkWriteOperation&lt;BR /&gt;import java.util.Date&lt;BR /&gt;import java.text.SimpleDateFormat&lt;BR /&gt;import com.rabbitmq.client.{ Connection, Channel, ConnectionFactory, QueueingConsumer }&lt;BR /&gt;import org.apache.spark.streaming.receiver.Receiver&lt;BR /&gt;import org.apache.spark.Logging&lt;BR /&gt;import scala.reflect.ClassTag&lt;BR /&gt;import org.apache.spark.storage.StorageLevel&lt;BR /&gt;class RMQReceiver[T: ClassTag] extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {&lt;BR /&gt;def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")&lt;BR /&gt;override def onStart() {&lt;BR /&gt;val factory = new ConnectionFactory()&lt;BR /&gt;factory.setUsername("myusername")&lt;BR /&gt;factory.setPassword("mypassword")&lt;BR /&gt;factory.setVirtualHost("/")&lt;BR /&gt;factory.setHost("10.0.0.6") //IP of the internal load balancer&lt;BR /&gt;factory.setPort(5673)&lt;BR /&gt;val connection = factory.newConnection()&lt;BR /&gt;val channel = connection.createChannel()&lt;BR /&gt;channel.queueDeclare("MDM_Smart_Metering", true, false, false, null)&lt;BR /&gt;val consumer = new QueueingConsumer(channel)&lt;BR /&gt;channel.basicConsume("MDM_Smart_Metering", false, consumer)&lt;BR /&gt;new Thread("RabbitMQ Receiver") {&lt;BR /&gt;override def run() {&lt;BR /&gt;while (!isStopped) {&lt;BR /&gt;var delivery = consumer.nextDelivery()&lt;BR /&gt;var message = fromBytes(delivery.getBody())&lt;BR /&gt;store(message)&lt;BR /&gt;channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)&lt;BR /&gt;}&lt;BR /&gt;}&lt;BR /&gt;}.start()&lt;BR /&gt;}&lt;BR /&gt;override def onStop() {}&lt;BR /&gt;}&lt;BR /&gt;val ssc = new StreamingContext(sc, Seconds(1))&lt;BR /&gt;val numStreams = 5&lt;BR /&gt;val rabbitStreams = (1 to numStreams).map { i =&amp;gt; ssc.receiverStream[String](new RMQReceiver()) }&lt;BR /&gt;val unifiedStream = ssc.union(rabbitStreams)&lt;BR /&gt;unifiedStream.reduceByWindow(_ + "\r\n" + _, Seconds(1), Seconds(1)).print()&lt;BR /&gt;ssc.start()&lt;BR /&gt;Thread.sleep(10 * 1000)&lt;BR /&gt;ssc.stop(true, true)&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:04:25 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24916#M5064</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T16:04:25Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24917#M5065</link>
      <description>&lt;P&gt;You usually use --executory-memory to set executor memory but I don't think it matters. &amp;nbsp;You also generally do not use env variables to configure spark-shell. Although it might be giving the desird results, i'd use standard command line flags.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;It sounds like simpler jobs are working. While you request 8 executors do you actually get them from YARN? go look at your executors tab.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:11:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24917#M5065</guid>
      <dc:creator>srowen</dc:creator>
      <dc:date>2015-02-22T16:11:05Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24919#M5066</link>
      <description>&lt;P&gt;Where do I find the executors tab?&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:22:39 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24919#M5066</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T16:22:39Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24920#M5067</link>
      <description>&lt;P&gt;Go the the Spark UI and look at the top of the screen -- click Executors&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:29:51 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24920#M5067</guid>
      <dc:creator>srowen</dc:creator>
      <dc:date>2015-02-22T16:29:51Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24921#M5068</link>
      <description>&lt;P&gt;It says Executors (3)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I also checked the dynamic resource pool on yarn. Resource pool usage shows that allocated cores is 3 and allocated containers is 3.&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:38:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24921#M5068</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T16:38:13Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24922#M5069</link>
      <description>&lt;P&gt;So, your app only has 3 cores from YARN? then your app can only be executing 3 tasks in parallel. I'm not sure how many receivers you are starting, but is that less?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;It sounds like you expected much more resource to be avialable, so I'd go look at your YARN config and what's using the resource and compare to what Spark is actually requesting.&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 16:52:10 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24922#M5069</guid>
      <dc:creator>srowen</dc:creator>
      <dc:date>2015-02-22T16:52:10Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24923#M5070</link>
      <description>&lt;P&gt;It is request 9 but only 3 get allocated. What are the configuration keys that need to be changed? I searched for all keys that end with "vcores" and increased them but still only 3 executors get allocated&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 17:03:50 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24923#M5070</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T17:03:50Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24924#M5071</link>
      <description>&lt;P&gt;Here are the configuration keys that I changed:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;yarn.nodemanager.resource.cpu-vcores -&amp;gt; 8&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;yarn.scheduler.maximum-allocation-vcores -&amp;gt; 32&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Still I only get 3 executors in Spark UI.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 17:12:43 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24924#M5071</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T17:12:43Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24925#M5072</link>
      <description>&lt;P&gt;I was finally able to get it to work by setting the value of&amp;nbsp;--executor-cores 12 &amp;nbsp;in spark-submit options.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 18:00:28 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24925#M5072</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T18:00:28Z</dc:date>
    </item>
    <item>
      <title>Re: [Spark Streaming] UnionDStream does not produce batches</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24926#M5073</link>
      <description>&lt;P&gt;Thanks sowen for your cooperation&lt;/P&gt;</description>
      <pubDate>Sun, 22 Feb 2015 18:01:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-Streaming-UnionDStream-does-not-produce-batches/m-p/24926#M5073</guid>
      <dc:creator>youssefyoussef</dc:creator>
      <dc:date>2015-02-22T18:01:21Z</dc:date>
    </item>
  </channel>
</rss>

