Member since
10-30-2020
1
Post
0
Kudos Received
0
Solutions
03-12-2021
03:56 PM
1 Kudo
Since I could never find the snippet for it, here it goes val queryListener = new QueryExecutionListener {
var numOutputRows: Option[Long] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}
spark.sqlContext.listenerManager.register(queryListener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sqlContext.listenerManager.unregister(queryListener)
}
queryListener.numOutputRows match {
case Some(num) => println(s"Wrote $num rows -- should be 500")
case => println(" the query did not succeed or there's no `numRowsWriten` metric")
} Or, alternatively, val listener = new SparkListener {
val rowsWritten: AtomicLong = new AtomicLong(0) // probably unecessary
override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
}
}
spark.sparkContext.addSparkListener(listener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sparkContext.removeSparkListener(listener)
}
println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")
... View more