Posts: 29
Registered: ‎03-28-2017

How to fix exception: Container exited with a non-zero exit code 143 ?

[ Edited ]

I am trying to move data from a table in postgres table to a Hive table on HDFS. To do that, I came up with the following code:


val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()

  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 ="jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("lowerBound", 1).option("upperBound", 200000)
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = => org.apache.spark.sql.functions.col(colname))
        val resultDF               =*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s =>
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")


The data is inserted into the hive table dynamically partitioned based on

prtn_String_columns: source_system_name, period_year, period_num

Spark-submit used:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/username/jars/postgresql-42.1.4.jar  --jars /home/username/jars/postgresql-42.1.4.jar --num-executors 2 --executor-cores 3 --executor-memory 60g --driver-memory 40g --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/username/username.keytab --principal username@DEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml, --name Splinter --conf spark.executor.extraClassPath=/home/username/jars/postgresql-42.1.4.jar

The job fails with the error messages:

java.lang.OutOfMemoryError: GC overhead limit exceeded
Container exited with a non-zero exit code 143.
Killed by external signal

I see in the logs that the read is being executed properly with the given number of partitions as below:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=5]

Below is the state of executors in stages:



After doing some analysis, I found out that the data I am reading has three source_system_names: location1, location2, location3 Among these, location2 has huge amount of data and when trying to insert that particular partition in Hive table the job is failing. The data is being partitioned properly and only while inserting the data into Hive table the job fails at the line:

sql at YearPartition.scala:154' which is spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

as shown in the executor image. This is due to the data in the partition: 'location2' which is so huge that the memory given is not sufficient and is present in a single partition. I tried to repartition the the dataframe: dataDF using

val dataDFPart = dataDF.repartition(30)

Could anyone let me know how can I repartition the dataframe in this case so that I can insert it into the Hive table properly.