Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

countDistinct() of dataset in Apache spark behave very different in "yarn" cluster mode ?

countDistinct() of dataset in Apache spark behave very different in "yarn" cluster mode ?

Expert Contributor

 Hi Everyone,
Hope you are doing well !

 

I have requirement to calculate profile's (such as min,max,unique count,null count,duplicate count etc) on each column of hive table using Apache spark.The input to spark job will be <database> & <table name>.

I am trying to implement the profile's as follow using spark Dataset/Dataframe API.

 

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions._
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._


object sparkProfilerClient {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("yarn").setAppName("Data Profiling")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val hive = HiveWarehouseSession.session(sparkSession).build()
    val sourceData = hive.executeQuery("SELECT *FROM <database>.<table>")
    sourceData.printSchema()

//Iterate on every column of hive table and calculate profiler's output
    sourceData.schema.foreach(st=> {
      val columnName = st.name.toString
      val nullcount = sourceData.filter(sourceData(columnName).isNull).count()
      val uniqueCount = sourceData.select(countDistinct(columnName)).collect()(0)
      //val uniqueCount = sourceData.select(columnName).distinct().collect()
      //sourceData.groupBy(sourceData(columnName)).count().show()

      println( st.name+":null Count:" + nullcount)
      println( st.name+":unique Count:" + uniqueCount)
    })
}
}

 


The "nullcount" variable gives me correct value in local and cluster mode both. But result not  getting stored as expected  in "unuqieCount" variable when we execute spark job in cluster-mode=yarn while works correctly in local mode.


Please find the output which i am getting after execution of spark code in yarn mode which is wrong. Everywhere output is like 0.

 

Spark Job Result in yarn modeSpark Job Result in yarn mode

I don't understand why it's happening like that. There are lots of links available on internet which explain us how to calculate distinct count value as given below. I have already tried all the links but still not understand why Apache spark not produced correct output.

 

Count Distinct Using Spark Dataset API



Source File:

 

id,firstname,lastname,dob,salary
12,satish,Khare,2014-12-11,34000.45
13,null,Pa,2013-12-24,null
14,sam,Paul,null,25000.65
15,marrie,null,2011-09-21,56788990.123
16,hank,Khare,2002-09-27,67780.45
null,walter,hamilton,2001-08-17,650.45

 


Do i have to change my coding approach of iteration on column and calculate profile's output?

Please point me in right direction ,if i am using spark functionality in wrong way?

 

Don't have an account?
Coming from Hortonworks? Activate your account here