New Contributor
Posts: 3
Registered: ‎06-22-2018

Issue with Spark Streaming, Yarn and non freed containers

I'm facing a performance issue with Spark Streaming (Spark 2.x).

My use case is:

 - Some devices publish messages to a Kafka (0.10.x) topic (5 nodes, 10 partitions), messages are randomly published, even if a device publishes to a unique partition)
 - Spark Streaming subscribes to the topic (DirectStream) to get the messages, split them to populate an RDD and write the RDD to Kudu (using KuduContext.upsertRows).

Spark is configured to run on Yarn (6 workers: 32 vcores and 256 GB RAM per node - 180GB dedicated to yarn on theses nodes).
Kudu is installed on the same nodes with a hard limit of 10 GB per TabletServer.
My spark streaming is submitted from an edge node in mode yarn-client with a period of 5s, 12 executors of 10 GB mem and 1 core.

When less than 300.000 devices connected and sending a message each 5 seconds, everything goes fine.
With more than 300.000 devices connected and sending a message each 5 seconds, Spark begins to do some strange things I can't understand (after one hour or two).
By strange things, I mean :


That's to say:
- the scheduling delay average when everything's fine is less than 1s.
- Sometimes, when things become more complicated, some scheduling delays increase until 1 minute, and come back to some more common values
- After 1-2 hours, scheduling delay becomes crazy and grows until overcome the kafka retention period and throwing an error, as data are not available anymore.

I thought Spark was asking for more resources, but Yarn console doesn't confirm: I'm using only 50% of the memory available, and less than 20% of the total vcores.


So.. What's happening?? I can't see anything in any log, nothing. It seems my process get stuck, without any reason (I'm sure there is a very logical one... but I can't find it). Of course, from this moment, nothing else is written to Kudu.

Another thing: when my devices stop to send messages, my spark application keeps all the 51 containers it asked for during the ingest phase without freeing them. Is it normal? May I tell to spark to free them? how?

What about Kudu? Maybe there's some problem with Kudu, but as far as I understood how Kudu works, data are flushed into Kudu TS memory before being written to disks, and this operation doesn't block the Spark Streaming job. I suppose I would have at leat a warning from Kudu telling me something goes wrong...

What about my Spark code? Here it is...


    // Create Spark Conf
      val conf: SparkConf = new SparkConf()
      conf.set("spark.streaming.concurrentJobs", "2")
      // Create Spark Session
      // **********
      val spark: SparkSession = SparkSession.builder()
      // Create Spark Streaming Context
      // **********
      val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
        () => new StreamingContext(
      def main(args: Array[String]) {
        // Start the job
        // **********
      def subscribe {
        // Create Kudu Context
        // **********
        val kudu: KuduContext = new KuduContext(kuduMasters, spark.sparkContext);
        // Subscribe to Kafka
        // **********
        // Topics to be subscribed
        val topicSetSession: Array[String] = topicSession.split(",")
        // Kafka subscriber configuration
        val sessionKafkaParams = Map[String, Object](
          "bootstrap.servers" -> brokers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "" -> "SessionSubscriber",
          "auto.offset.reset" -> "latest",
          "" -> (false: java.lang.Boolean))
        // Get Session Stream
        val rawSessionStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSetSession, sessionKafkaParams))
        // Pull the Kafka topic and process the stream
        val processedSessionStream = ingestSession(rawSessionStream)
        // Persist the session
        persistSession(kudu, processedSessionStream)
       * Start the Spark ETL
      def startEtl {
        // Start the Spark Streaming batch
       * Close the spark session
      def close {
       * Process the raw stream polled from Kafka and convert it into DStream
       *     @param rawStream Raw Stream polled from Kafka
      def ingestSession(rawSessionStream: InputDStream[ConsumerRecord[String, String]]): DStream[KuduSession] = {
        var parsedSessionStream = => KuduSession(record.value.toString.split('|')))
       * Persist each record from the processed stream into a persistence Layer
       * @param kuduContext Kudu context to be used to persist into Kudu
       * @param processStream Processed stream of data to persist
      def persistSession(kuduContext: KuduContext, processedSessionStream: DStream[KuduSession]): Unit = {
        // Process each record
        import spark.implicits._
        val newNames = Seq("session_id", "device_id", "channel_id", "real_channel_id",
          "start_session_year", "start_session_month", "start_session_day",
          "start_session_hour", "start_session_minute", "start_session_second",
          "end_session_year", "end_session_month", "end_session_day",
          "end_session_hour", "end_session_minute", "end_session_second")
        processedSessionStream.foreachRDD(rdd => {
          kuduContext.upsertRows(rdd.toDF(newNames: _*), "impala::devices.hbbtv_session")

Thank you for any help!

Cloudera Employee
Posts: 1
Registered: ‎07-19-2017

Re: Issue with Spark Streaming, Yarn and non freed containers

Hello ChelouteMS,

When I look at the chart and compare it with what you have described where you noticed the scheduling delay increased once the number of devices publishing data to Kafka topics increases, I don't see anything alarming. It's a classic issue where you being to have batches pile up and scheduling delays increasing as you increase the amount of data that needs to be processed. 

The odd thing I see is that you seem to have 51 containers that were allocated even though you mentioned that you specifically asked for 12 executors with 1 core each. I would then expect your application to have 13 containers in total (12 executors + 1 for an Application Master). How are you specifying the number of executors? Do you have dynamic allocation enabled? If so, I would try disabling that:

The next place to look is the output of your spark2-submit command (Driver output since you're running in client mode) and the Application Master's log to confirm if the application did in fact ask for that many containers or not.