Support Questions
Find answers, ask questions, and share your expertise

Reading Input Stream using Spark Custom receiver

Explorer

I have written custom receiver to receive the stream that is being generated by one of our application. The receiver starts the process gets the stream and then cals store. However, the receive method gets called multiple times, I have written proper loop break condition, but, could not do it. How to ensure it only reads once and does not read the already processed data.? Here is my custom receiver code:

	class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  def onStart() {
    new Thread("Splunk Receiver") {
       override def run() { receive() }
    }.start()
  }
  def onStop() {
  }
  private def receive() {
    try {
      /*  My Code to run a process and get the stream */
      val reader = new ResultsReader(job.getResults()); // ResultReader is reader for the appication
      var event:String = reader.getNextLine;
      while (!isStopped || event != null) {
        store(event);
        event = reader.getNextLine;
      }
      reader.close()
    } catch {
      case t: Throwable =>
        restart("Error receiving data", t)
    }
  }
} 

Where did i go wrong.? Problems 1) The job and stream reading happening multiple times and same data is piling up. So, for 60 line of data, i am getting 1800 or greater some times, in total.

Streaming Code:

val conf = new SparkConf
    conf.setAppName("str1");
    conf.setMaster("local[2]")
    conf.set("spark.driver.allowMultipleContexts", "true");
    val ssc = new StreamingContext(conf, Minutes(2));
    val customReceiverStream = ssc.receiverStream(new MyReceiver)
    println(" searching ");
    //if(customReceiverStream.count() > 0 ){
     customReceiverStream.foreachRDD(x => {println("=====>"+ x.count());x.count()});
    //}
    ssc.start();
    ssc.awaitTermination() 
Note: I am trying this in my local cluster, and with master as local[2].
11 REPLIES 11

Explorer

Normally data should not be received more than once, did you see some error logs like "Error receiving data" or exceptions, can you please share more details? Also it may depends on the behavior of your source in which your customer receiver connected.

Mentor

@Srinivasarao Daruna what did you end up with? can you accept the best answer to close this thread or post your own solution?