Support Questions
Find answers, ask questions, and share your expertise

SparkSQL performance issue with collect method

New Contributor

We are currently facing a performance issue in sparksql written in scala language. Application flow is mentioned below. 1)Spark application reads a text file from input hdfs directory 2)Creates a data frame on top of the file using programmatically specifying schema. This dataframe will be an exact replication of the input file kept in memory. Will have around 18 columns in the dataframe [var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)] 3)Creates a filtered dataframe from the first data frame constructed in step 2. This dataframe will contain unique account numbers with the help of distinct keyword.[var distAccNrsDF ="accountnumber").distinct().collect()] 4)Using the two dataframes constructed in step 2 & 3, we will get all the records which belong to one account number and do some Json parsing logic on top of the filtered data.[var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()] 5)Finally the json parsed data will be put into Hbase table Here we are facing performance issues while calling the collect method on top of the data frames. Because collect will fetch all the data into a single node and then do the processing, thus losing the parallel processing benefit. Also in real scenario there will be 10 billion records of data which we can expect. Hence collecting all those records in to driver node will might crash the program itself due to memory or disk space limitations. I don't think the take method can be used in our case which will fetch limited number of records at a time. We have to get all the unique account numbers from the whole data and hence I am not sure whether take method, which takes limited records at a time, will suit our requirements Appreciate any help to avoid calling collect methods and have some other best practises to follow. Code snippets/suggestions/git links will be very helpful if anyone have had faced similar issues

Code snippet


val eqpSchemaString = "acoountnumber ....."; val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))); val eqpRdd = sc.textFile(inputPath) val eqpRowRdd =",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....) var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema); var distAccNrsDF ="accountnumber").distinct().collect() distAccNrsDF.foreach { data => var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect() var result = new JSONObject() result.put("jsonSchemaVersion", "1.0") val firstRowAcc = filtrEqpDF(0) //Json parsing logic { ..... ..... } }


Yes, the collect() ACTION does require everything to come to the driver, but there is a better ACTION for you. Try using foreach() which is like an RDD's map() function in that it works on each partition of the underlying dataset independent of the other partitions (so you can run it wide!!). It returns nothing, which is probably what you want it to do anyways. Good luck and happy Sparking!

; ;