Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. in Thread for paralel procesing

Highlighted in Thread for paralel procesing

New Contributor
Hi colleagues, I'm trying to process a dataframe that is queried in small
batches to get data from kudu, the problem is that all the work in batch,
the transformation map throws an error, when referring to variable variable
outside the 'new Thread', How can I solve this? This happens with spark 2

Caused by:
Serialization stack:
- object not serializable (class:$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1, value: Thread[kudu job 0,5,main])
- field (class:$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1$$anonfun$45, name: $outer, type: class$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1)
- object (class$$anonfun$procesa_tablas_detino$1$$anonfun$apply$2$$anon$1$$anonfun$45, <function1>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)

val h = sparkContext.broadcast("Hello")

batch_suc => {
new Thread {
override def run {
this.setName(s"kudu job ${kudu_job}")

ProcessesRecordsBo.log.warn(s"${this.getName} :: Inicia batch de ${batch_suc.length} sucursales")

var tot_kudu_tb: DataFrame = null
val ds_kudu_tb_all =

val campos_mem: Map[String, Int]
campo_index => { -> campo_index._2

val campos_hd: Map[String, Int]
Stream from campos_mem.size
campo_index => { -> campo_index._2

for (pedidos_sucursal <- batch_suc) {
val fipais: Long = pedidos_sucursal.getAs("fipais")
val ficanal: Long = pedidos_sucursal.getAs("ficanal")
val fisucursal: Long = pedidos_sucursal.getAs("fisucursal")
val arr_pedidos: mutable.WrappedArray[Long] = pedidos_sucursal.getAs("arr_finopedido")
val no_pedidos: Long = pedidos_sucursal.getAs("count_pedidos")

val predicado_kudu
= $"fipais".===(fipais)
.and($"finopedido".isin(arr_pedidos: _*))

if (tot_kudu_tb == null) {
tot_kudu_tb = ds_kudu_tb_all.where(predicado_kudu)
} else {
tot_kudu_tb = tot_kudu_tb.union(ds_kudu_tb_all.where(predicado_kudu))

var df_joined_mem_hd: DataFrame

val df_procesado
input_row => {
println(h.value) // <---- Throws Exception


ProcessesRecordsBo.log.warn(s"${this.getName} :: Inicia escritura de parte de tabla '${tabla_prep._2._2.fcnombre}' en kudu")
ProcessesRecordsBo.log.warn(s"${this.getName} :: Finaliza escritura de parte de tabla '${tabla_prep._2._2.fcnombre}' en kudu")
ProcessesRecordsBo.log.warn(s"${this.getName} :: Termina batch de ${batch_suc.length}")
kudu_job += 1

val finshed = threadPool_kudu_jobs.awaitTermination(1, TimeUnit.HOURS)
ProcessesRecordsBo.log.warn(s"Termina jobs de kudu: finshed: '${finshed}'")