Created on 07-25-2015 07:40 AM - edited 09-16-2022 02:35 AM
Hi guys,
I am trying to create a thread that creates a Data Frame from a text file and shows its content using the following code. It compiles successfully, but throws Task Not Serializable exception when I run it. Any suggestion how to make it work?
Thanks for your time in advance.
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
println("----->>>>>>IN\n")
var dimCustomer1 = fDimCustomer.map(_.split(',')).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF
dimCustomer1.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}
Created 07-26-2015 02:20 AM
I don't think that was the problem, I changed the code as below and it worked.
The issue was in toDF method:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
val schemaString = "ID Name City EffectiveFrom EffectiveTo"
val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)
println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))
var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)
customerDataFrame.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
println("+")
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}
Created 07-25-2015 08:32 AM
You would really have to show more of the error, like, what is not serializable? typically that will point you to the problem. Something in the closure of your function is being dragged along when it's serialized, and it is not serializable.
Created 07-25-2015 05:14 PM
Thanks,
This is the full error:
Exception in thread "Thread-31" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at cc.run(sqlcntxt.scala:37)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: cc
Serialization stack:
- object not serializable (class: cc, value: cc@4b86d77)
- field (class: cc$$anonfun$2, name: $outer, type: class cc)
- object (class cc$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 5 more
Created 07-26-2015 12:42 AM
Yes that shows the problem directly. You function has a reference to the instance of the outer class cc, and that is not serializable. You'll probably have to locate how your function is using the outer class and remove that. Or else the outer class cc has to be serializable.
Created 07-26-2015 02:20 AM
I don't think that was the problem, I changed the code as below and it worked.
The issue was in toDF method:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkConf
import sys.process._
class cc extends Runnable{
val conf = new SparkConf().setAppName("LoadDW")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
override def run(): Unit = {
var fileName = "DimCustomer.txt"
val fDimCustomer = sc.textFile("DimCustomer.txt")
val schemaString = "ID Name City EffectiveFrom EffectiveTo"
val schema =
StructType(List(
StructField("ID", IntegerType, true),
StructField("Name", StringType, true),
StructField("City", StringType, true),
StructField("EffectiveFrom", IntegerType, true),
StructField("EffectiveTo", IntegerType, true)
)
)
println("----->>>>>>sdsdsd2222\n")
var dimCustomerRDD = fDimCustomer.map(_.split(',')).map(r=>Row(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt))
var customerDataFrame = sqlContext.createDataFrame(dimCustomerRDD, schema)
customerDataFrame.registerTempTable("Cust_1")
val customers = sqlContext.sql("select * from Cust_1")
customers.show()
println("+")
}
}
object pp extends App {
val cp = new cc()
val rThread = new Thread (cp)
rThread.start()
}