New Contributor
Posts: 1
Registered: ‎11-30-2017

SparkSQL performance issue with collect method

[ Edited ]

We are currently facing a performance issue in sparksql written in scala language. Application flow is mentioned below.

1)Spark application reads a text file from input hdfs directory
2)Creates a data frame on top of the file using programmatically specifying schema. This dataframe will be an exact replication of the input file kept in memory. Will have around 18 columns in the dataframe [var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)]
3)Creates a filtered dataframe from the first data frame constructed in step 2. This dataframe will contain unique account numbers with the help of distinct keyword.[var distAccNrsDF ="accountnumber").distinct().collect()]
4)Using the two dataframes constructed in step 2 & 3, we will get all the records which belong to one account number and do some Json parsing logic on top of the filtered data.[var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()]
5)Finally the json parsed data will be put into Hbase table

Here we are facing performance issues while calling the collect method on top of the data frames. Because collect will fetch all the data into a single node and then do the processing, thus losing the parallel processing benefit.
Also in real scenario there will be 10 billion records of data which we can expect. Hence collecting all those records in to driver node will might crash the program itself due to memory or disk space limitations.

I don't think the take method can be used in our case which will fetch limited number of records at a time. We have to get all the unique account numbers from the whole data and hence I am not sure whether take method, which takes
limited records at a time, will suit our requirements

Appreciate any help to avoid calling collect methods and have some other best practises to follow. Code snippets/suggestions/git links will be very helpful if anyone have had faced similar issues


Code snippet
val eqpSchemaString = "acoountnumber ....."
val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));

val eqpRdd = sc.textFile(inputPath)
val eqpRowRdd =",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....)

var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema);

var distAccNrsDF ="accountnumber").distinct().collect()

distAccNrsDF.foreach { data =>

var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()

var result = new JSONObject()

result.put("jsonSchemaVersion", "1.0")
val firstRowAcc = filtrEqpDF(0)
//Json parsing logic

Expert Contributor
Posts: 152
Registered: ‎07-01-2015

Re: SparkSQL performance issue with collect method

You should create a dataframe with DISTINCT and then do a map operation on that dataframe to write out to Hbase. Therefore nothing goes to the driver.

Of course during the distinct Spark will shuffle all the data, it is a costly operation. But do that, and then you can do multiple puts from multiple workers.

You will need to handle the Hbase connection on executors, google for it, Ted Malaska had a good examples.