Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Fetch distinct values of a column in Dataframe using Spark

Fetch distinct values of a column in Dataframe using Spark

I am working on Spark 1.6.1 version and have a requirement to fetch distinct results of a column using Spark DataFrames. The column contains ~50 million records and doing a collect() operation slows down further operation on the result dataframe and there is No parallelism. Using the below piece of code on a local mode works fine. But on a yarn-cluster mode i get "java.lang.NoClassDefFoundError".

preProcessedDataFrame.registerTempTable("tTempTable")
preProcessedDataFrame.distinct().foreach(record => {
  val applicationId = record.getAs[Int]("ApplicationId")
  val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tTempTable WHERE ApplicationId = " + applicationId)
  selectedApplicationDataFrame.show(20)
  //FURTHER DO SOME MORE CALC BASED ON EACH APPLICATION-ID
})

Can someone tell me the reason for the error or any other better approach to achieve the same result.

9 REPLIES 9

Re: Fetch distinct values of a column in Dataframe using Spark

Contributor

Can you share full details of error message?You are probably missing some libraries. If you are reading data from hdfs and running in yarn-cluster mode your parallelism by default will be equal to number of hdfs blocks. As best practice you should avoid doing collect operation unless its small test dataset and instead use saveAsTextFile method to write result dataset to hdfs or local file.

Re: Fetch distinct values of a column in Dataframe using Spark

Below is the exception details. The same above piece of code runs on LOCAL-MODE. On YARN-CLUSTER i see the below error during iteration.

java.lang.NoClassDefFoundError: Could not initialize class com.sg.ae.bp.BreakpointPredictionDriver$
	at com.sg.ae.bp.BreakpointPredictionDriver$anonfun$com$sg$ae$bp$BreakpointPredictionDriver$run$1.apply(BreakpointPredictionDriver.scala:126)
	at com.sg.ae.bp.BreakpointPredictionDriver$anonfun$com$sg$ae$bp$BreakpointPredictionDriver$run$1.apply(BreakpointPredictionDriver.scala:124)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.rdd.RDD$anonfun$foreach$1$anonfun$apply$34.apply(RDD.scala:919)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1881)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)						

Re: Fetch distinct values of a column in Dataframe using Spark

Super Guru
@Narasimhan Kazhiyur

When you run Spark in Yarn-cluster mode, your application jar needs to be shipped to the cluster. When you run in cluster mode, do you specify your application jar using application-jar option?

If not, please check the following link.

http://spark.apache.org/docs/latest/submitting-applications.html

and following link also to understand how cluster mode works.

http://spark.apache.org/docs/latest/submitting-applications.html

Re: Fetch distinct values of a column in Dataframe using Spark

Yes i am doing the same. Below is my spark-submit command.

spark-submit --class com.BreakpointPredictionDriver --master yarn-cluster --num-executors 4 --driver-memory 8g --executor-memory 8g --executor-cores 4 --name BreakPointPrediction --jars /usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar hdfs://PROD1/testdata/spark-jobs/bp-preprocessing-assembly-1.0.0.jar --hbaseZookeeperQuorum 127.0.0.1 --hbaseZookeeperPort 2181

Re: Fetch distinct values of a column in Dataframe using Spark

Super Guru

@Narasimhan Kazhiyur So the jars you are supplying include com.sg.ae.bp.BreakpointPredictionDriver?

It could be a dependency issue. You can ship one uber jar that has everything including all dependencies. It think the issue is with shipping the jar with all classes.

Re: Fetch distinct values of a column in Dataframe using Spark

Yes i have 1 fat jar that has all the dependencies. The thing is when i use collect() in my below code it works on yarn-cluster. But using collect() removes parallelism on further operations, so i don want to use collect.

Without using collect() statement i get "java.lang.NoClassDefFoundError" exception as mentioned above. The below code on local mode works fine without collect statement.

Please help me understand this behavior.

   preProcessedDataFrame.registerTempTable("tApplication")
      preProcessedDataFrame.select(ApplicationId).distinct().collect().foreach(record => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplication WHERE ApplicationId = " + applicationId)
      logger.info("selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....
      })

Re: Fetch distinct values of a column in Dataframe using Spark

Super Guru

Can you please share the code without collect.

Highlighted

Re: Fetch distinct values of a column in Dataframe using Spark

Its the same code as above without collect()

      preProcessedDataFrame.registerTempTable("tApplicationid")
      preProcessedDataFrame.select(ApplicationId).distinct().foreach(record => {
      val applicationId = record.getAs[String](ApplicationId)
      val selectedApplicationDataFrame = sqlContext.sql("SELECT * FROM tApplicationId WHERE ApplicationId = " + applicationId)
      logger.info("selectedApplicationId: " +  applicationId)
      //DO FURTHER PROCESSSING on selectedApplicationDataFrame.....
      })

Re: Fetch distinct values of a column in Dataframe using Spark

i figured the cause of my issue i was facing. In the above piece of code i am doing a nested transformation which is not supported by Spark. Any approach i should follow to avoid this nested transformation ?

Don't have an account?
Coming from Hortonworks? Activate your account here