Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Counting the number of rows after writing to a dataframe to a database with spark

I have seen similar question on stack overflow

but I am not really sure

1. How to use the code in actual working example. I have written some code but it is not working for the outputting the number of rows inputting rows works. The output metrics are always none.

16167-codeforlogginginputoutput.png

Code writing to db.

16168-savingtodb.png

2. the best or preferred way of doing this.

https://stackoverflow.com/questions/37496650/spark-how-to-get-the-number-of-written-rows

https://stackoverflow.com/questions/43934168/how-to-get-the-number-of-records-written-using-datafram...

Basically it seems like I can get the row count from the spark ui but how can I get it from within the spark code.

I know that before I write the database I can do a count on a dataframe but how do it after I write to get the count.

I have posted a lot of info but I just want to know how can I see programmatically the number of rows written by a dataframe to a database.


codeforlogginginputoutput.png
4 REPLIES 4

New Contributor

Hi

I'm facing same issue. Can you please let me know , if you found the solution?

Thanks

New Contributor

<dataframe>.count() will return the number of rows in the dataframe

Super Guru

New Contributor

Since I could never find the snippet for it, here it goes

 

val queryListener = new QueryExecutionListener {
  var numOutputRows: Option[Long] = None

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
    numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}

spark.sqlContext.listenerManager.register(queryListener)

try {
  // write some 500 numbers with some unecessary complex query
  spark.sparkContext
    .range(0, 1000, 1).toDS
    .map(_ / 2)
    .distinct()
    .write.parquet("file:///tmp/numbers")
} finally { 
  spark.sqlContext.listenerManager.unregister(queryListener)
}

queryListener.numOutputRows match {
    case Some(num) => println(s"Wrote $num rows -- should be 500")
    case => println("  the query did not succeed or there's no `numRowsWriten` metric")
}

 

Or, alternatively,

 

val listener = new SparkListener {
  val rowsWritten: AtomicLong = new AtomicLong(0)  // probably unecessary

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
    rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
  }
}

spark.sparkContext.addSparkListener(listener)
try {
  // write some 500 numbers with some unecessary complex query
  spark.sparkContext
    .range(0, 1000, 1).toDS
    .map(_ / 2)
    .distinct()
    .write.parquet("file:///tmp/numbers")
} finally {
  spark.sparkContext.removeSparkListener(listener)
}

println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")

 

 

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.