- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Counting the number of rows after writing to a dataframe to a database with spark
- Labels:
-
Apache Spark
Created on ‎06-08-2017 05:44 PM - edited ‎08-17-2019 10:14 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have seen similar question on stack overflow
but I am not really sure
1. How to use the code in actual working example. I have written some code but it is not working for the outputting the number of rows inputting rows works. The output metrics are always none.
Code writing to db.
2. the best or preferred way of doing this.
https://stackoverflow.com/questions/37496650/spark-how-to-get-the-number-of-written-rows
Basically it seems like I can get the row count from the spark ui but how can I get it from within the spark code.
I know that before I write the database I can do a count on a dataframe but how do it after I write to get the count.
I have posted a lot of info but I just want to know how can I see programmatically the number of rows written by a dataframe to a database.
Created ‎03-28-2018 07:23 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi
I'm facing same issue. Can you please let me know , if you found the solution?
Thanks
Created ‎10-30-2020 05:11 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
<dataframe>.count() will return the number of rows in the dataframe
Created ‎03-28-2018 07:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Try the new Spark 2.2 with the new listener
Created on
‎03-12-2021
03:56 PM
- last edited on
‎03-13-2021
08:02 AM
by
ask_bill_brooks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Since I could never find the snippet for it, here it goes
val queryListener = new QueryExecutionListener {
var numOutputRows: Option[Long] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}
spark.sqlContext.listenerManager.register(queryListener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sqlContext.listenerManager.unregister(queryListener)
}
queryListener.numOutputRows match {
case Some(num) => println(s"Wrote $num rows -- should be 500")
case => println(" the query did not succeed or there's no `numRowsWriten` metric")
}
Or, alternatively,
val listener = new SparkListener {
val rowsWritten: AtomicLong = new AtomicLong(0) // probably unecessary
override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
}
}
spark.sparkContext.addSparkListener(listener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sparkContext.removeSparkListener(listener)
}
println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")
