Support Questions
Find answers, ask questions, and share your expertise

Spark Exception: Task Not Serializable

New Contributor
Getting error for the following code snippet: 

object SparkTaskTry extends Logging { 
  63   /** 
  64    * Extends the normal Try constructor to allow TaskKilledExceptions to propagate 
  65    */ 
  66   def apply[T](r: => T): Try[T] = 
  67     try scala.util.Success(r) catch { 
  68       case e: TaskKilledException => throw e 
  69       case NonFatal(e) => 
  70         logInfo("Caught and Ignored Exception: " + e.toString) 
  71         e.printStackTrace() 
  72         Failure(e) 
  73     } 
  74 } 

override def buildScan( 
 349       requiredColumns: Array[String], 
 350       filters: Array[Filter], 
 351       inputFiles: Array[FileStatus], 
 352       broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { 
 353     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) 
 354     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown 
 355     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString 
 356     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp 
 357     val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec 
 358 
 359     // When merging schemas is enabled and the column of the given filter does not exist, 
 360     // Parquet emits an exception which is an issue of Parquet (PARQUET-389). 
 361     val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown 
 362 
 363     // Parquet row group size. We will use this value as the value for 
 364     // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value 
 365     // of these flags are smaller than the parquet row group size. 
 366     val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) 
 367 
 368     // Create the function to set variable Parquet confs at both driver and executor side. 
 369     val initLocalJobFuncOpt = 
 370       ParquetRelation.initializeLocalJobFunc( 
 371         requiredColumns, 
 372         filters, 
 373         dataSchema, 
 374         parquetBlockSize, 
 375         useMetadataCache, 
 376         safeParquetFilterPushDown, 
 377         assumeBinaryIsString, 
 378         assumeInt96IsTimestamp, 
 379         followParquetFormatSpec) _ 
 380 
 381     // Create the function to set input paths at the driver side. 
 382     val setInputPaths = 
 383       ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ 
 384 
 385     Utils.withDummyCallSite(sqlContext.sparkContext) { 
 386       new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with Logging { 
 387 
 388         override def getPartitions: Array[SparkPartition] = internalRDD.getPartitions 
 389 
 390         override def getPreferredLocations(split: SparkPartition): Seq[String] = 
 391           internalRDD.getPreferredLocations(split) 
 392 
 393         override def checkpoint() { 
 394           // Do nothing. Hadoop RDD should not be checkpointed. 
 395         } 
 396 
 397         override def persist(storageLevel: StorageLevel): this.type = { 
 398           super.persist(storageLevel) 
 399         } 
 400 
 401         val internalRDD: SqlNewHadoopRDD[InternalRow] = new SqlNewHadoopRDD( 
 402         sc = sqlContext.sparkContext, 
 403         broadcastedConf = broadcastedConf, 
 404         initDriverSideJobFuncOpt = Some(setInputPaths), 
 405         initLocalJobFuncOpt = Some(initLocalJobFuncOpt), 
 406         inputFormatClass = if (isSplittable) { 
 407           classOf[ParquetInputFormat[InternalRow]] 
 408         } else { 
 409           classOf[ParquetRowInputFormatIndivisible] 
 410         }, 
 411         valueClass = classOf[InternalRow]) { 
 412 
 413         val cacheMetadata = useMetadataCache 
 414 
 415         @transient val cachedStatuses = inputFiles.map { f => 
 416           // In order to encode the authority of a Path containing special characters such as '/' 
 417           // (which does happen in some S3N credentials), we need to use the string returned by the 
 418           // URI of the path to create a new Path. 
 419           val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) 
 420           new FileStatus( 
 421             f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 
 422             f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) 
 423         }.toSeq 
 424 
 425         private def escapePathUserInfo(path: Path): Path = { 
 426           val uri = path.toUri 
 427           new Path(new URI( 
 428             uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, 
 429             uri.getQuery, uri.getFragment)) 
 430         } 
 431 
 432         // Overridden so we can inject our own cached files statuses. 
 433         override def getPartitions: Array[SparkPartition] = { 
 434           val inputFormat = new ParquetInputFormat[InternalRow] { 
 435             override def listStatus(jobContext: JobContext): JList[FileStatus] = { 
 436               if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) 
 437             } 
 438           } 
 439 
 440           val jobContext = newJobContext(getConf(isDriverSide = true), jobId) 
 441           val rawSplits = inputFormat.getSplits(jobContext) 
 442 
 443           Array.tabulate[SparkPartition](rawSplits.size) { i => 
 444             new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 
 445           } 
 446         } 
 447       } 
 448 
 449         override def compute(part: SparkPartition, context: TaskContext): 
 450         InterruptibleIterator[Try[InternalRow]] = { 
 451           val iter: Iterator[InternalRow] = internalRDD.constructIter(part, context) 
 452           val tryIter = new Iterator[Try[InternalRow]] { 
 453             override def next(): Try[InternalRow] = { 
 454               val readAttempt = SparkTaskTry(iter.next()) 
 455               readAttempt 
 456             } 
 457 
 458             override def hasNext: Boolean = { 
 459               SparkTaskTry[Boolean](iter.hasNext) match { 
 460                 case scala.util.Success(r) => r 
 461                 case _ => false 
 462               } 
 463             } 
 464           } 
 465           new InterruptibleIterator[Try[InternalRow]](context, tryIter) 
 466         } 
 467 
 468       }.filter(_.isSuccess).map(_.get) 
 469         .asInstanceOf[RDD[Row]]  // type erasure hack to pass RDD[InternalRow] as RDD[Row] 
 470     } 
 471   } 

Error StackTrace : 
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation 
Serialization stack: 
        - object not serializable (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value: ParquetRelation) 
        - field (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1, name: $outer, type: class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation) 
        - object (class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1, <function0>) 
        - field (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, name: $outer, type: class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1) 
        - object (class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, $anonfun$buildInternalScan$1$$anon$2[2] at ) 
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@47293a0b) 
        - writeObject data (class: scala.collection.immutable.$colon$colon) 
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@47293a0b)) 
        - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4] at ) 
        - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: prev, type: class org.apache.spark.rdd.RDD) 
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[5] at ) 
        - field (class: org.apache.spark.sql.execution.PhysicalRDD, name: rdd, type: class org.apache.spark.rdd.RDD) 
        - object (class org.apache.spark.sql.execution.PhysicalRDD, Scan ParquetRelation[_1#0] InputPaths: hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet 
) 
        - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan) 
        - object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe 
+- Scan ParquetRelation[_1#0] InputPaths: hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet 
) 
        - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe) 
        - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>) 
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
        ... 78 more 

Please help!


1 REPLY 1

Whenever I see that message in my code it means I'm trying to reference some object in a spark clause like a map or a filter, but as the object doesn't implement java.lang.Serializable, it can't be sent over the wire to the executors. I don't see the obvious problem here, but suspect it'll be that function 'r'