Reply
New Contributor
Posts: 1
Registered: ‎03-22-2017

How to optimize Spark application in case table with too many columns?

I have a simple Spark application for data validation between source like DB2,Oracle and target Hive. My application is working fine when running against table with columns around 200-250 but same code starts lagging and take too much time to complete when run for table with 400 columns and 9 million records or more. I have attached complete code (tried in yarn-client and yarn-cluster mode both.) Below is the function which is comparing data of two tables.

def compareTable(sourceTable:String,targetTable:String,applicationName:String,toolPath:String,result:PrintStream)={
try{
val dbProperties = new Properties()
val propFile = "file:"+"//"+toolPath+"/"+applicationName+".properties"
dbProperties.load(new URL(propFile).openStream())
val connectionURL = dbProperties.getProperty("connURL")
val password = dbProperties.getProperty("password")
val userName = dbProperties.getProperty("userName")
val driver = dbProperties.getProperty("driver")
val sourceSchema = dbProperties.getProperty("sourceSchema")
val targetSchema = dbProperties.getProperty("targetSchema")
dbProperties.setProperty("user",userName);
dbProperties.setProperty("password",password);
dbProperties.setProperty("driver",driver);
val sourceDF = hiveContext.read.jdbc(connectionURL,sourceTable,dbProperties)
val hiveQuery = "select * from "+ targetSchema+"."+targetTable
val targetDF = hiveContext.sql(hiveQuery)
val sourceColumnSet:Array[(String,String)] = sourceDF.dtypes
val targetColumnSet:Array[(String,String)] = targetDF.dtypes
val columnSets =  compareSchema(targetColumnSet,sourceColumnSet,targetTable) //"This compare schema of tables and return column sets."
sourceDF.registerTempTable("sourceTable")
targetDF.registerTempTable("targetTable")
val sourceQuery = "select "+columnSets._1+" from sourceTable"
val targetQuery = "select "+columnSets._2+" from targetTable"
val SDF = hiveContext.sql(sourceQuery)
val TDF = hiveContext.sql(targetQuery)
val misMatchedRecords = TDF.except(SDF)
val misMatchedRecordsCount = misMatchedRecords.count
if ( misMatchedRecordsCount > 0){
println("Mismatched Records of target table: "+targetTable)
misMatchedRecords.show()
}
else{
println("All records are correct in target table: " + targetTable)
}