Reply
New Contributor
Posts: 11
Registered: ‎07-21-2015
Accepted Solution

Spark Exception: Task Not Serializable

Hi guys,

I am trying to create a thread that creates a Data Frame from a text file and shows its content using the following code. It compiles successfully, but throws Task Not Serializable exception when I run it. Any suggestion how to make it work?

 

Thanks for your time in advance.

 

 


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import sys.process._

 

class cc extends Runnable{

case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

 

override def run(): Unit = {
var fileName = "DimCustomer.txt"


val fDimCustomer = sc.textFile("DimCustomer.txt")

println("----->>>>>>IN\n")
var dimCustomer1 = fDimCustomer.map(_.split(',')).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF

dimCustomer1.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")

customers.show()

}
}

object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}

 

 

 

 

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: Spark Exception: Task Not Serializable

You would really have to show more of the error, like, what is not serializable? typically that will point you to the problem. Something in the closure of your function is being dragged along when it's serialized, and it is not serializable.

New Contributor
Posts: 11
Registered: ‎07-21-2015

Re: Spark Exception: Task Not Serializable

Thanks,

This is the full error:

 

Exception in thread "Thread-31" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at cc.run(sqlcntxt.scala:37)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: cc
Serialization stack:
- object not serializable (class: cc, value: cc@4b86d77)
- field (class: cc$$anonfun$2, name: $outer, type: class cc)
- object (class cc$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 5 more

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: Spark Exception: Task Not Serializable

Yes that shows the problem directly. You function has a reference to the instance of the outer class cc, and that is not serializable. You'll probably have to locate how your function is using the outer class and remove that. Or else the outer class cc has to be serializable.

Highlighted
New Contributor
Posts: 11
Registered: ‎07-21-2015

Re: Spark Exception: Task Not Serializable

I don't think that was the problem, I changed the code as below and it worked.

The issue was in toDF method:

 


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._

 

class cc extends Runnable{

val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


override def run(): Unit = {
var fileName = "DimCustomer.txt"


val fDimCustomer = sc.textFile("DimCustomer.txt")

val schemaString = "ID Name City EffectiveFrom EffectiveTo"

 


val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)

 

println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))

var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)

customerDataFrame.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")

customers.show()
println("+")
}
}

object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}

 

 

 

 

Announcements