Reply
Highlighted
Explorer
Posts: 62
Registered: ‎01-22-2014
Accepted Solution

Metrics for a Spark Streaming Operation

 

Hi,

 

I am streaming data in Spark and doing a join operation with a batch file in HDFS.

 

I am joining one window of the stream with HDFS.

 

I want to calculate the time taken to do this join (for each window) using the below code, but it did not work. (the output was 0 always).

 

I am using the Spark-Shell  for this code.

 

Any suggestions on how to achieve this? Thanks!

 

val jobstarttime = System.currentTimeMillis();
val ssc = new StreamingContext(sc, Seconds(60))
val streamrecs = ssc.socketTextStream("10.11.12.13", 5549)
val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6))))
val streamwindow = streamkv.window(Minutes(2))
val HDFSlines = sc.textFile("/user/batchdata").map(_.split("~")).map(r => ( r(1), (r(0))))
val outfile = new PrintWriter(new File("//home//user1//metrics1" ))
val joinstarttime = System.currentTimeMillis();
val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} )
val joinsendtime = System.currentTimeMillis();
val jointime = (joinsendtime - joinstarttime)/1000
val J = jointime.toString()
val J1 = "\n Time taken for Joining is " + J
outfile.write(J1)
join1.print()
val savestarttime = System.currentTimeMillis();
join1.saveAsTextFiles("/user/joinone5")
val savesendtime = System.currentTimeMillis();
val savetime = (savesendtime - savestarttime)/1000
val S = savetime.toString()
val S1 = "\n Time taken for Saving is " + S
outfile.write(S1)
ssc.start()
outfile.close()
ssc.awaitTermination()

 

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Metrics for a Spark Streaming Operation

The code here doesn't do work, really. It sets up and configures work. It expresses where data comes from, how it is transformed, and where it goes. No work is done until ssc.start(). So timing the code before doesn't help.

 

You can already see some timing information in the Spark Streaming UI.

 

You can try computing timing within the functions, since that will time them at the time of execution. However, even methods like .join() called in the transform() function are themselves transformations, that don't do work immediately. It would not help to time that one. Actions like foreach would make sense to time.

 

Really I would start by looking at Spark's built-in timing metrics.

Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Metrics for a Spark Streaming Operation

Thanks!

 

By Spark Streaming UI, do you mean the Spark Master UI?

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Metrics for a Spark Streaming Operation

Yes there is a special Streaming tab in the latest Spark driver UI.

Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Metrics for a Spark Streaming Operation

Thanks!

Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Metrics for a Spark Streaming Operation

By latest do you mean the version 1.1.0?

So does the version 1.0.0 that comes with CDH5.1 does not have this feature?
Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Metrics for a Spark Streaming Operation

I believe it was added in 1.1, yes. I don't have a streaming app driver handy, so maybe double-check -- you will see an obvious Streaming tab if it's there. Without guaranteeing anything, I think the next CDH will have 1.1, and at any time you can run your own Spark jobs with any version under YARN.

Announcements