Reply
Highlighted
New Contributor
Posts: 3
Registered: ‎06-22-2018

Issue with Spark Streaming, Yarn and non freed containers

I'm facing a performance issue with Spark Streaming (Spark 2.x).

My use case is:

 - Some devices publish messages to a Kafka (0.10.x) topic (5 nodes, 10 partitions), messages are randomly published, even if a device publishes to a unique partition)
 - Spark Streaming subscribes to the topic (DirectStream) to get the messages, split them to populate an RDD and write the RDD to Kudu (using KuduContext.upsertRows).

Spark is configured to run on Yarn (6 workers: 32 vcores and 256 GB RAM per node - 180GB dedicated to yarn on theses nodes).
Kudu is installed on the same nodes with a hard limit of 10 GB per TabletServer.
My spark streaming is submitted from an edge node in mode yarn-client with a period of 5s, 12 executors of 10 GB mem and 1 core.

When less than 300.000 devices connected and sending a message each 5 seconds, everything goes fine.
With more than 300.000 devices connected and sending a message each 5 seconds, Spark begins to do some strange things I can't understand (after one hour or two).
By strange things, I mean :

Screenshot_3.png

That's to say:
- the scheduling delay average when everything's fine is less than 1s.
- Sometimes, when things become more complicated, some scheduling delays increase until 1 minute, and come back to some more common values
- After 1-2 hours, scheduling delay becomes crazy and grows until overcome the kafka retention period and throwing an error, as data are not available anymore.

I thought Spark was asking for more resources, but Yarn console doesn't confirm: I'm using only 50% of the memory available, and less than 20% of the total vcores.

Screenshot_2.png

So.. What's happening?? I can't see anything in any log, nothing. It seems my process get stuck, without any reason (I'm sure there is a very logical one... but I can't find it). Of course, from this moment, nothing else is written to Kudu.

Another thing: when my devices stop to send messages, my spark application keeps all the 51 containers it asked for during the ingest phase without freeing them. Is it normal? May I tell to spark to free them? how?

What about Kudu? Maybe there's some problem with Kudu, but as far as I understood how Kudu works, data are flushed into Kudu TS memory before being written to disks, and this operation doesn't block the Spark Streaming job. I suppose I would have at leat a warning from Kudu telling me something goes wrong...

What about my Spark code? Here it is...

 

    // Create Spark Conf
      val conf: SparkConf = new SparkConf()
      conf.set("spark.streaming.concurrentJobs", "2")
      conf.setAppName("Audience")
      conf.setMaster(master)
      conf.setSparkHome(sparkHome)
    
      // Create Spark Session
      // **********
      val spark: SparkSession = SparkSession.builder()
        .config(conf)
        .getOrCreate()
    
      // Create Spark Streaming Context
      // **********
      val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
        () => new StreamingContext(
          spark.sparkContext,
          duration))
    
      def main(args: Array[String]) {
    
        // Start the job
        // **********
        subscribe
        startEtl
      }
    
      def subscribe {
    
        // Create Kudu Context
        // **********
        val kudu: KuduContext = new KuduContext(kuduMasters, spark.sparkContext);
    
        // Subscribe to Kafka
        // **********
        // Topics to be subscribed
        val topicSetSession: Array[String] = topicSession.split(",")
        // Kafka subscriber configuration
        val sessionKafkaParams = Map[String, Object](
          "bootstrap.servers" -> brokers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "SessionSubscriber",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean))
    
        // Get Session Stream
        val rawSessionStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSetSession, sessionKafkaParams))
    
        // Pull the Kafka topic and process the stream
        val processedSessionStream = ingestSession(rawSessionStream)
    
        // Persist the session
        persistSession(kudu, processedSessionStream)
    
      }
    
      /**
       * Start the Spark ETL
       */
      def startEtl {
        // Start the Spark Streaming batch
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
       * Close the spark session
       */
      def close {
        ssc.stop(true)
        spark.close
      }
   
      /**
       * Process the raw stream polled from Kafka and convert it into DStream
       *
       *     @param rawStream Raw Stream polled from Kafka
       */
      def ingestSession(rawSessionStream: InputDStream[ConsumerRecord[String, String]]): DStream[KuduSession] = {
        var parsedSessionStream = rawSessionTvStream.map(record => KuduSession(record.value.toString.split('|')))
        parsedSessionStream
      }
    
      /**
       * Persist each record from the processed stream into a persistence Layer
       *
       * @param kuduContext Kudu context to be used to persist into Kudu
       * @param processStream Processed stream of data to persist
       */
      def persistSession(kuduContext: KuduContext, processedSessionStream: DStream[KuduSession]): Unit = {
        // Process each record
        import spark.implicits._
    
        val newNames = Seq("session_id", "device_id", "channel_id", "real_channel_id",
          "start_session_year", "start_session_month", "start_session_day",
          "start_session_hour", "start_session_minute", "start_session_second",
          "end_session_year", "end_session_month", "end_session_day",
          "end_session_hour", "end_session_minute", "end_session_second")
    
        processedSessionStream.foreachRDD(rdd => {
          kuduContext.upsertRows(rdd.toDF(newNames: _*), "impala::devices.hbbtv_session")
        })
      }


Thank you for any help!

Announcements