Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Metrics for a Spark Streaming Operation

avatar
Explorer

 

Hi,

 

I am streaming data in Spark and doing a join operation with a batch file in HDFS.

 

I am joining one window of the stream with HDFS.

 

I want to calculate the time taken to do this join (for each window) using the below code, but it did not work. (the output was 0 always).

 

I am using the Spark-Shell  for this code.

 

Any suggestions on how to achieve this? Thanks!

 

val jobstarttime = System.currentTimeMillis();
val ssc = new StreamingContext(sc, Seconds(60))
val streamrecs = ssc.socketTextStream("10.11.12.13", 5549)
val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6))))
val streamwindow = streamkv.window(Minutes(2))
val HDFSlines = sc.textFile("/user/batchdata").map(_.split("~")).map(r => ( r(1), (r(0))))
val outfile = new PrintWriter(new File("//home//user1//metrics1" ))
val joinstarttime = System.currentTimeMillis();
val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} )
val joinsendtime = System.currentTimeMillis();
val jointime = (joinsendtime - joinstarttime)/1000
val J = jointime.toString()
val J1 = "\n Time taken for Joining is " + J
outfile.write(J1)
join1.print()
val savestarttime = System.currentTimeMillis();
join1.saveAsTextFiles("/user/joinone5")
val savesendtime = System.currentTimeMillis();
val savetime = (savesendtime - savestarttime)/1000
val S = savetime.toString()
val S1 = "\n Time taken for Saving is " + S
outfile.write(S1)
ssc.start()
outfile.close()
ssc.awaitTermination()

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Yes there is a special Streaming tab in the latest Spark driver UI.

View solution in original post

6 REPLIES 6

avatar
Master Collaborator

The code here doesn't do work, really. It sets up and configures work. It expresses where data comes from, how it is transformed, and where it goes. No work is done until ssc.start(). So timing the code before doesn't help.

 

You can already see some timing information in the Spark Streaming UI.

 

You can try computing timing within the functions, since that will time them at the time of execution. However, even methods like .join() called in the transform() function are themselves transformations, that don't do work immediately. It would not help to time that one. Actions like foreach would make sense to time.

 

Really I would start by looking at Spark's built-in timing metrics.

avatar
Explorer

Thanks!

 

By Spark Streaming UI, do you mean the Spark Master UI?

avatar
Master Collaborator

Yes there is a special Streaming tab in the latest Spark driver UI.

avatar
Explorer

Thanks!

avatar
Explorer
By latest do you mean the version 1.1.0?

So does the version 1.0.0 that comes with CDH5.1 does not have this feature?

avatar
Master Collaborator

I believe it was added in 1.1, yes. I don't have a streaming app driver handy, so maybe double-check -- you will see an obvious Streaming tab if it's there. Without guaranteeing anything, I think the next CDH will have 1.1, and at any time you can run your own Spark jobs with any version under YARN.