- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Spark Exception: Task Not Serializable
- Labels:
-
Apache Spark
Created on ‎07-25-2015 07:40 AM - edited ‎09-16-2022 02:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi guys,
I am trying to create a thread that creates a Data Frame from a text file and shows its content using the following code. It compiles successfully, but throws Task Not Serializable exception when I run it. Any suggestion how to make it work?
Thanks for your time in advance.
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
println("----->>>>>>IN\n")
var dimCustomer1 = fDimCustomer.map(_.split(',')).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF
dimCustomer1.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}
Created ‎07-26-2015 02:20 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I don't think that was the problem, I changed the code as below and it worked.
The issue was in toDF method:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
val schemaString = "ID Name City EffectiveFrom EffectiveTo"
val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)
println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))
var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)
customerDataFrame.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
println("+")
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}
Created ‎07-25-2015 08:32 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You would really have to show more of the error, like, what is not serializable? typically that will point you to the problem. Something in the closure of your function is being dragged along when it's serialized, and it is not serializable.
Created ‎07-25-2015 05:14 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks,
This is the full error:
Exception in thread "Thread-31" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at cc.run(sqlcntxt.scala:37)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: cc
Serialization stack:
- object not serializable (class: cc, value: cc@4b86d77)
- field (class: cc$$anonfun$2, name: $outer, type: class cc)
- object (class cc$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 5 more
Created ‎07-26-2015 12:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Yes that shows the problem directly. You function has a reference to the instance of the outer class cc, and that is not serializable. You'll probably have to locate how your function is using the outer class and remove that. Or else the outer class cc has to be serializable.
Created ‎07-26-2015 02:20 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I don't think that was the problem, I changed the code as below and it worked.
The issue was in toDF method:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
val schemaString = "ID Name City EffectiveFrom EffectiveTo"
val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)
println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))
var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)
customerDataFrame.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
println("+")
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}
