Reply
New Contributor
Posts: 1
Registered: ‎01-04-2018

java.io.NotSerializableException in Thread for paralel procesing

[ Edited ]
Hi colleagues, I'm trying to process a dataframe that is queried in small
batches to get data from kudu, the problem is that all the work in batch,
the transformation map throws an error, when referring to variable variable
outside the 'new Thread', How can I solve this? This happens with spark 2

Caused by: java.io.NotSerializableException: mx.com.gsalinas.contabilidad.cloudera
.spark.icc.interpretador.bo.
ProcessesRecordsBo$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1
Serialization stack:
- object not serializable (class: mx.com.gsalinas.contabilidad.cloudera.spark.icc.interpretador.bo.ProcessesRecordsBo$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1, value: Thread[kudu job 0,5,main])
- field (class: mx.com.gsalinas.contabilidad.cloudera.spark.icc.interpretador.bo.ProcessesRecordsBo$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1$$anonfun$45, name: $outer, type: class mx.com.gsalinas.contabilidad.cloudera.spark.icc.interpretador.bo.ProcessesRecordsBo$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1)
- object (class mx.com.gsalinas.contabilidad.cloudera.spark.icc.interpretador.bo.ProcessesRecordsBo$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1$$anonfun$45, <function1>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)

val h = sparkContext.broadcast("Hello")

batches.foreach(
batch_suc => {
threadPool_kudu_jobs.execute(
new Thread {
override def run {
this.setName(s"kudu job ${kudu_job}")

ProcessesRecordsBo.log.warn(s"${this.getName} :: Inicia batch de ${batch_suc.length} sucursales")

var tot_kudu_tb: DataFrame = null
val ds_kudu_tb_all = InterpretadorSingletonResourceManager.spark.read.options(
kuduOptions
).kudu

val campos_mem: Map[String, Int]
= df_pre_proc.schema.zipWithIndex.map(
campo_index => {
campo_index._1.name -> campo_index._2
}
).toMap

val campos_hd: Map[String, Int]
= ds_kudu_tb_all.schema.zip(
Stream from campos_mem.size
).map(
campo_index => {
campo_index._1.name -> campo_index._2
}
).toMap

for (pedidos_sucursal <- batch_suc) {
val fipais: Long = pedidos_sucursal.getAs("fipais")
val ficanal: Long = pedidos_sucursal.getAs("ficanal")
val fisucursal: Long = pedidos_sucursal.getAs("fisucursal")
val arr_pedidos: mutable.WrappedArray[Long] = pedidos_sucursal.getAs("arr_finopedido")
val no_pedidos: Long = pedidos_sucursal.getAs("count_pedidos")

val predicado_kudu
= $"fipais".===(fipais)
.and($"ficanal".===(ficanal))
.and($"fisucursal".===(fisucursal))
.and($"finopedido".isin(arr_pedidos: _*))

if (tot_kudu_tb == null) {
tot_kudu_tb = ds_kudu_tb_all.where(predicado_kudu)
} else {
tot_kudu_tb = tot_kudu_tb.union(ds_kudu_tb_all.where(predicado_kudu))
}
}

var df_joined_mem_hd: DataFrame
= df_pre_proc.as(
"df_mem"
).join(
tot_kudu_tb.as(
"df_hd"
),
column_llave_join,
"left_outer"
)

val df_procesado
= df_joined_mem_hd.map(
input_row => {
println(h.value) // <---- Throws Exception
1
}
)

df_procesado.count()

ProcessesRecordsBo.log.warn(s"${this.getName} :: Inicia escritura de parte de tabla '${tabla_prep._2._2.fcnombre}' en kudu")
//df_procesado.write.options(kuduOptions).mode(SaveMode.Append).kudu
ProcessesRecordsBo.log.warn(s"${this.getName} :: Finaliza escritura de parte de tabla '${tabla_prep._2._2.fcnombre}' en kudu")
ProcessesRecordsBo.log.warn(s"${this.getName} :: Termina batch de ${batch_suc.length}")
kudu_job += 1
}
}
)
}
)

threadPool_kudu_jobs.shutdown
val finshed = threadPool_kudu_jobs.awaitTermination(1, TimeUnit.HOURS)
ProcessesRecordsBo.log.warn(s"Termina jobs de kudu: finshed: '${finshed}'")
Announcements