Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark Exception: Task Not Serializable

avatar
Contributor

Hi guys,

I am trying to create a thread that creates a Data Frame from a text file and shows its content using the following code. It compiles successfully, but throws Task Not Serializable exception when I run it. Any suggestion how to make it work?

 

Thanks for your time in advance.

 

 


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import sys.process._

 

class cc extends Runnable{

case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

 

override def run(): Unit = {
var fileName = "DimCustomer.txt"


val fDimCustomer = sc.textFile("DimCustomer.txt")

println("----->>>>>>IN\n")
var dimCustomer1 = fDimCustomer.map(_.split(',')).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF

dimCustomer1.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")

customers.show()

}
}

object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}

 

 

 

 

1 ACCEPTED SOLUTION

avatar
Contributor

I don't think that was the problem, I changed the code as below and it worked.

The issue was in toDF method:

 


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._

 

class cc extends Runnable{

val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


override def run(): Unit = {
var fileName = "DimCustomer.txt"


val fDimCustomer = sc.textFile("DimCustomer.txt")

val schemaString = "ID Name City EffectiveFrom EffectiveTo"

 


val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)

 

println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))

var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)

customerDataFrame.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")

customers.show()
println("+")
}
}

object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}

 

 

 

 

View solution in original post

4 REPLIES 4

avatar
Master Collaborator

You would really have to show more of the error, like, what is not serializable? typically that will point you to the problem. Something in the closure of your function is being dragged along when it's serialized, and it is not serializable.

avatar
Contributor

Thanks,

This is the full error:

 

Exception in thread "Thread-31" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at cc.run(sqlcntxt.scala:37)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: cc
Serialization stack:
- object not serializable (class: cc, value: cc@4b86d77)
- field (class: cc$$anonfun$2, name: $outer, type: class cc)
- object (class cc$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 5 more

avatar
Master Collaborator

Yes that shows the problem directly. You function has a reference to the instance of the outer class cc, and that is not serializable. You'll probably have to locate how your function is using the outer class and remove that. Or else the outer class cc has to be serializable.

avatar
Contributor

I don't think that was the problem, I changed the code as below and it worked.

The issue was in toDF method:

 


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._

 

class cc extends Runnable{

val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


override def run(): Unit = {
var fileName = "DimCustomer.txt"


val fDimCustomer = sc.textFile("DimCustomer.txt")

val schemaString = "ID Name City EffectiveFrom EffectiveTo"

 


val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)

 

println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))

var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)

customerDataFrame.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")

customers.show()
println("+")
}
}

object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}