Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Counting the number of rows after writing to a dataframe to a database with spark

avatar

I have seen similar question on stack overflow

but I am not really sure

1. How to use the code in actual working example. I have written some code but it is not working for the outputting the number of rows inputting rows works. The output metrics are always none.

16167-codeforlogginginputoutput.png

Code writing to db.

16168-savingtodb.png

2. the best or preferred way of doing this.

https://stackoverflow.com/questions/37496650/spark-how-to-get-the-number-of-written-rows

https://stackoverflow.com/questions/43934168/how-to-get-the-number-of-records-written-using-datafram...

Basically it seems like I can get the row count from the spark ui but how can I get it from within the spark code.

I know that before I write the database I can do a count on a dataframe but how do it after I write to get the count.

I have posted a lot of info but I just want to know how can I see programmatically the number of rows written by a dataframe to a database.


codeforlogginginputoutput.png
4 REPLIES 4

avatar
Explorer

Hi

I'm facing same issue. Can you please let me know , if you found the solution?

Thanks

avatar
New Contributor

<dataframe>.count() will return the number of rows in the dataframe

avatar
Master Guru

avatar
New Contributor

Since I could never find the snippet for it, here it goes

 

val queryListener = new QueryExecutionListener {
  var numOutputRows: Option[Long] = None

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
    numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}

spark.sqlContext.listenerManager.register(queryListener)

try {
  // write some 500 numbers with some unecessary complex query
  spark.sparkContext
    .range(0, 1000, 1).toDS
    .map(_ / 2)
    .distinct()
    .write.parquet("file:///tmp/numbers")
} finally { 
  spark.sqlContext.listenerManager.unregister(queryListener)
}

queryListener.numOutputRows match {
    case Some(num) => println(s"Wrote $num rows -- should be 500")
    case => println("  the query did not succeed or there's no `numRowsWriten` metric")
}

 

Or, alternatively,

 

val listener = new SparkListener {
  val rowsWritten: AtomicLong = new AtomicLong(0)  // probably unecessary

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
    rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
  }
}

spark.sparkContext.addSparkListener(listener)
try {
  // write some 500 numbers with some unecessary complex query
  spark.sparkContext
    .range(0, 1000, 1).toDS
    .map(_ / 2)
    .distinct()
    .write.parquet("file:///tmp/numbers")
} finally {
  spark.sparkContext.removeSparkListener(listener)
}

println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")