Support Questions

Find answers, ask questions, and share your expertise
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Fetch distinct values of a column in Dataframe using Spark

I am working on Spark 1.6.1 version and have a requirement to fetch distinct results of a column using Spark DataFrames. The column contains ~50 million records and doing a collect() operation slows down further operation on the result dataframe and there is No parallelism. Using the below piece of code on a local mode works fine. But on a yarn-cluster mode i get "java.lang.NoClassDefFoundError".

preProcessedDataFrame.distinct().foreach(record => {
  val applicationId = record.getAs[Int]("ApplicationId")
  val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tTempTable WHERE ApplicationId = " + applicationId)

Can someone tell me the reason for the error or any other better approach to achieve the same result.



Can you share full details of error message?You are probably missing some libraries. If you are reading data from hdfs and running in yarn-cluster mode your parallelism by default will be equal to number of hdfs blocks. As best practice you should avoid doing collect operation unless its small test dataset and instead use saveAsTextFile method to write result dataset to hdfs or local file.

Below is the exception details. The same above piece of code runs on LOCAL-MODE. On YARN-CLUSTER i see the below error during iteration.

java.lang.NoClassDefFoundError: Could not initialize class$
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.executor.Executor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$

Super Guru
@Narasimhan Kazhiyur

When you run Spark in Yarn-cluster mode, your application jar needs to be shipped to the cluster. When you run in cluster mode, do you specify your application jar using application-jar option?

If not, please check the following link.

and following link also to understand how cluster mode works.

Yes i am doing the same. Below is my spark-submit command.

spark-submit --class com.BreakpointPredictionDriver --master yarn-cluster --num-executors 4 --driver-memory 8g --executor-memory 8g --executor-cores 4 --name BreakPointPrediction --jars /usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar hdfs://PROD1/testdata/spark-jobs/bp-preprocessing-assembly-1.0.0.jar --hbaseZookeeperQuorum --hbaseZookeeperPort 2181

Super Guru

@Narasimhan Kazhiyur So the jars you are supplying include

It could be a dependency issue. You can ship one uber jar that has everything including all dependencies. It think the issue is with shipping the jar with all classes.

Yes i have 1 fat jar that has all the dependencies. The thing is when i use collect() in my below code it works on yarn-cluster. But using collect() removes parallelism on further operations, so i don want to use collect.

Without using collect() statement i get "java.lang.NoClassDefFoundError" exception as mentioned above. The below code on local mode works fine without collect statement.

Please help me understand this behavior.

   preProcessedDataFrame.registerTempTable("tApplication") => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplication WHERE ApplicationId = " + applicationId)"selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....

Super Guru

Can you please share the code without collect.

Its the same code as above without collect()

      preProcessedDataFrame.registerTempTable("tApplicationid") => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplicationId WHERE ApplicationId = " + applicationId)"selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....

i figured the cause of my issue i was facing. In the above piece of code i am doing a nested transformation which is not supported by Spark. Any approach i should follow to avoid this nested transformation ?

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.