Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to optimize Spark application in case table with too many columns?

How to optimize Spark application in case table with too many columns?

New Contributor

I have a simple Spark application for data validation between source like DB2,Oracle and target Hive. My application is working fine when running against table with columns around 200-250 but same code starts lagging and take too much time to complete when run for table with 400 columns and 9 million records or more. I have attached complete code (tried in yarn-client and yarn-cluster mode both.) Below is the function which is comparing data of two tables.

def compareTable(sourceTable:String,targetTable:String,applicationName:String,toolPath:String,result:PrintStream)={
try{
val dbProperties = new Properties()
val propFile = "file:"+"//"+toolPath+"/"+applicationName+".properties"
dbProperties.load(new URL(propFile).openStream())
val connectionURL = dbProperties.getProperty("connURL")
val password = dbProperties.getProperty("password")
val userName = dbProperties.getProperty("userName")
val driver = dbProperties.getProperty("driver")
val sourceSchema = dbProperties.getProperty("sourceSchema")
val targetSchema = dbProperties.getProperty("targetSchema")
dbProperties.setProperty("user",userName);
dbProperties.setProperty("password",password);
dbProperties.setProperty("driver",driver);
val sourceDF = hiveContext.read.jdbc(connectionURL,sourceTable,dbProperties)
val hiveQuery = "select * from "+ targetSchema+"."+targetTable
val targetDF = hiveContext.sql(hiveQuery)
val sourceColumnSet:Array[(String,String)] = sourceDF.dtypes
val targetColumnSet:Array[(String,String)] = targetDF.dtypes
val columnSets =  compareSchema(targetColumnSet,sourceColumnSet,targetTable) //"This compare schema of tables and return column sets."
sourceDF.registerTempTable("sourceTable")
targetDF.registerTempTable("targetTable")
val sourceQuery = "select "+columnSets._1+" from sourceTable"
val targetQuery = "select "+columnSets._2+" from targetTable"
val SDF = hiveContext.sql(sourceQuery)
val TDF = hiveContext.sql(targetQuery)
val misMatchedRecords = TDF.except(SDF)
val misMatchedRecordsCount = misMatchedRecords.count
if ( misMatchedRecordsCount > 0){
println("Mismatched Records of target table: "+targetTable)
misMatchedRecords.show()
}
else{
println("All records are correct in target table: " + targetTable)
}