Support Questions
Find answers, ask questions, and share your expertise

countDistinct() of dataset in Apache spark behave very different in "yarn" cluster mode ?

countDistinct() of dataset in Apache spark behave very different in "yarn" cluster mode ?

Expert Contributor

 Hi Everyone,
Hope you are doing well !

 

I have requirement to calculate profile's (such as min,max,unique count,null count,duplicate count etc) on each column of hive table using Apache spark.The input to spark job will be <database> & <table name>.

I am trying to implement the profile's as follow using spark Dataset/Dataframe API.

 

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions._
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._


object sparkProfilerClient {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("yarn").setAppName("Data Profiling")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val hive = HiveWarehouseSession.session(sparkSession).build()
    val sourceData = hive.executeQuery("SELECT *FROM <database>.<table>")
    sourceData.printSchema()

//Iterate on every column of hive table and calculate profiler's output
    sourceData.schema.foreach(st=> {
      val columnName = st.name.toString
      val nullcount = sourceData.filter(sourceData(columnName).isNull).count()
      val uniqueCount = sourceData.select(countDistinct(columnName)).collect()(0)
      //val uniqueCount = sourceData.select(columnName).distinct().collect()
      //sourceData.groupBy(sourceData(columnName)).count().show()

      println( st.name+":null Count:" + nullcount)
      println( st.name+":unique Count:" + uniqueCount)
    })
}
}

 


The "nullcount" variable gives me correct value in local and cluster mode both. But result not  getting stored as expected  in "unuqieCount" variable when we execute spark job in cluster-mode=yarn while works correctly in local mode.


Please find the output which i am getting after execution of spark code in yarn mode which is wrong. Everywhere output is like 0.

 

Spark Job Result in yarn modeSpark Job Result in yarn mode

I don't understand why it's happening like that. There are lots of links available on internet which explain us how to calculate distinct count value as given below. I have already tried all the links but still not understand why Apache spark not produced correct output.

 

Count Distinct Using Spark Dataset API



Source File:

 

id,firstname,lastname,dob,salary
12,satish,Khare,2014-12-11,34000.45
13,null,Pa,2013-12-24,null
14,sam,Paul,null,25000.65
15,marrie,null,2011-09-21,56788990.123
16,hank,Khare,2002-09-27,67780.45
null,walter,hamilton,2001-08-17,650.45

 


Do i have to change my coding approach of iteration on column and calculate profile's output?

Please point me in right direction ,if i am using spark functionality in wrong way?