Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Reading Input Stream using Spark Custom receiver

Reading Input Stream using Spark Custom receiver

Explorer

I have written custom receiver to receive the stream that is being generated by one of our application. The receiver starts the process gets the stream and then cals store. However, the receive method gets called multiple times, I have written proper loop break condition, but, could not do it. How to ensure it only reads once and does not read the already processed data.? Here is my custom receiver code:

	class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  def onStart() {
    new Thread("Splunk Receiver") {
       override def run() { receive() }
    }.start()
  }
  def onStop() {
  }
  private def receive() {
    try {
      /*  My Code to run a process and get the stream */
      val reader = new ResultsReader(job.getResults()); // ResultReader is reader for the appication
      var event:String = reader.getNextLine;
      while (!isStopped || event != null) {
        store(event);
        event = reader.getNextLine;
      }
      reader.close()
    } catch {
      case t: Throwable =>
        restart("Error receiving data", t)
    }
  }
} 

Where did i go wrong.? Problems 1) The job and stream reading happening multiple times and same data is piling up. So, for 60 line of data, i am getting 1800 or greater some times, in total.

Streaming Code:

val conf = new SparkConf
    conf.setAppName("str1");
    conf.setMaster("local[2]")
    conf.set("spark.driver.allowMultipleContexts", "true");
    val ssc = new StreamingContext(conf, Minutes(2));
    val customReceiverStream = ssc.receiverStream(new MyReceiver)
    println(" searching ");
    //if(customReceiverStream.count() > 0 ){
     customReceiverStream.foreachRDD(x => {println("=====>"+ x.count());x.count()});
    //}
    ssc.start();
    ssc.awaitTermination() 
Note: I am trying this in my local cluster, and with master as local[2].
11 REPLIES 11

Re: Reading Input Stream using Spark Custom receiver

Expert Contributor

You created a streaming context with a batch duration of 2 Minutes, so the dstream you are processing is being built up by the reciever and then being processed every 120 seconds, not every 2 seconds as you have indicated. Did you misspeak when you said 2 seconds, as opposed to 2 minutes?

Highlighted

Re: Reading Input Stream using Spark Custom receiver

Explorer

Hi Joe, i meant multiple times. i was running quiet often and ending up getting lot of records, even though the actual count is 60.

Re: Reading Input Stream using Spark Custom receiver

Explorer

some one in stack overflow had told me about offset settings. My requirement is to get the results from InputStream using spark streaming and process them. I need to make sure i only receive them once/process them once. If i could make sure that semantic works, it would be great.

Re: Reading Input Stream using Spark Custom receiver

Mentor

@azeltov help

Re: Reading Input Stream using Spark Custom receiver

Explorer

Hi Artem, do you need any more details from my side.?

Re: Reading Input Stream using Spark Custom receiver

Mentor

@Srinivasarao Daruna no, there are better experts on this list to help, I was just calling one out.

Re: Reading Input Stream using Spark Custom receiver

Contributor

@sshao is a spark streaming guru!

Re: Reading Input Stream using Spark Custom receiver

Explorer

Hi Ned, how to reach him.?

Re: Reading Input Stream using Spark Custom receiver

Mentor

@Srinivasarao Daruna since Ned tagged him, the user will be notified.

Don't have an account?
Coming from Hortonworks? Activate your account here