Created on 06-08-2017 05:44 PM - edited 08-17-2019 10:14 PM
I have seen similar question on stack overflow
but I am not really sure
1. How to use the code in actual working example. I have written some code but it is not working for the outputting the number of rows inputting rows works. The output metrics are always none.
Code writing to db.
2. the best or preferred way of doing this.
https://stackoverflow.com/questions/37496650/spark-how-to-get-the-number-of-written-rows
Basically it seems like I can get the row count from the spark ui but how can I get it from within the spark code.
I know that before I write the database I can do a count on a dataframe but how do it after I write to get the count.
I have posted a lot of info but I just want to know how can I see programmatically the number of rows written by a dataframe to a database.
Created 03-28-2018 07:23 PM
Hi
I'm facing same issue. Can you please let me know , if you found the solution?
Thanks
Created 10-30-2020 05:11 AM
<dataframe>.count() will return the number of rows in the dataframe
Created 03-28-2018 07:42 PM
Try the new Spark 2.2 with the new listener
Created on 03-12-2021 03:56 PM - last edited on 03-13-2021 08:02 AM by ask_bill_brooks
Since I could never find the snippet for it, here it goes
val queryListener = new QueryExecutionListener {
var numOutputRows: Option[Long] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}
spark.sqlContext.listenerManager.register(queryListener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sqlContext.listenerManager.unregister(queryListener)
}
queryListener.numOutputRows match {
case Some(num) => println(s"Wrote $num rows -- should be 500")
case => println(" the query did not succeed or there's no `numRowsWriten` metric")
}
Or, alternatively,
val listener = new SparkListener {
val rowsWritten: AtomicLong = new AtomicLong(0) // probably unecessary
override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
}
}
spark.sparkContext.addSparkListener(listener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sparkContext.removeSparkListener(listener)
}
println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")