Support Questions

Find answers, ask questions, and share your expertise

Fetch distinct values of a column in Dataframe using Spark

avatar
Contributor

I am working on Spark 1.6.1 version and have a requirement to fetch distinct results of a column using Spark DataFrames. The column contains ~50 million records and doing a collect() operation slows down further operation on the result dataframe and there is No parallelism. Using the below piece of code on a local mode works fine. But on a yarn-cluster mode i get "java.lang.NoClassDefFoundError".

preProcessedDataFrame.registerTempTable("tTempTable")
preProcessedDataFrame.distinct().foreach(record => {
  val applicationId = record.getAs[Int]("ApplicationId")
  val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tTempTable WHERE ApplicationId = " + applicationId)
  selectedApplicationDataFrame.show(20)
  //FURTHER DO SOME MORE CALC BASED ON EACH APPLICATION-ID
})

Can someone tell me the reason for the error or any other better approach to achieve the same result.

9 REPLIES 9

avatar
Rising Star

Can you share full details of error message?You are probably missing some libraries. If you are reading data from hdfs and running in yarn-cluster mode your parallelism by default will be equal to number of hdfs blocks. As best practice you should avoid doing collect operation unless its small test dataset and instead use saveAsTextFile method to write result dataset to hdfs or local file.

avatar
Contributor

Below is the exception details. The same above piece of code runs on LOCAL-MODE. On YARN-CLUSTER i see the below error during iteration.

java.lang.NoClassDefFoundError: Could not initialize class com.sg.ae.bp.BreakpointPredictionDriver$
	at com.sg.ae.bp.BreakpointPredictionDriver$anonfun$com$sg$ae$bp$BreakpointPredictionDriver$run$1.apply(BreakpointPredictionDriver.scala:126)
	at com.sg.ae.bp.BreakpointPredictionDriver$anonfun$com$sg$ae$bp$BreakpointPredictionDriver$run$1.apply(BreakpointPredictionDriver.scala:124)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)						

avatar
Super Guru
@Narasimhan Kazhiyur

When you run Spark in Yarn-cluster mode, your application jar needs to be shipped to the cluster. When you run in cluster mode, do you specify your application jar using application-jar option?

If not, please check the following link.

http://spark.apache.org/docs/latest/submitting-applications.html

and following link also to understand how cluster mode works.

http://spark.apache.org/docs/latest/submitting-applications.html

avatar
Contributor

Yes i am doing the same. Below is my spark-submit command.

spark-submit --class com.BreakpointPredictionDriver --master yarn-cluster --num-executors 4 --driver-memory 8g --executor-memory 8g --executor-cores 4 --name BreakPointPrediction --jars /usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar hdfs://PROD1/testdata/spark-jobs/bp-preprocessing-assembly-1.0.0.jar --hbaseZookeeperQuorum 127.0.0.1 --hbaseZookeeperPort 2181

avatar
Super Guru

@Narasimhan Kazhiyur So the jars you are supplying include com.sg.ae.bp.BreakpointPredictionDriver?

It could be a dependency issue. You can ship one uber jar that has everything including all dependencies. It think the issue is with shipping the jar with all classes.

avatar
Contributor

Yes i have 1 fat jar that has all the dependencies. The thing is when i use collect() in my below code it works on yarn-cluster. But using collect() removes parallelism on further operations, so i don want to use collect.

Without using collect() statement i get "java.lang.NoClassDefFoundError" exception as mentioned above. The below code on local mode works fine without collect statement.

Please help me understand this behavior.

   preProcessedDataFrame.registerTempTable("tApplication")
      preProcessedDataFrame.select(ApplicationId).distinct().collect().foreach(record => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplication WHERE ApplicationId = " + applicationId)
      logger.info("selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....
      })

avatar
Super Guru

Can you please share the code without collect.

avatar
Contributor

Its the same code as above without collect()

      preProcessedDataFrame.registerTempTable("tApplicationid")
      preProcessedDataFrame.select(ApplicationId).distinct().foreach(record => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplicationId WHERE ApplicationId = " + applicationId)
      logger.info("selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....
      })

avatar
Contributor

i figured the cause of my issue i was facing. In the above piece of code i am doing a nested transformation which is not supported by Spark. Any approach i should follow to avoid this nested transformation ?