Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming +Flume + kudu frozen stage

Highlighted

Spark Streaming +Flume + kudu frozen stage

Contributor

Hi, I'm using Spark Streaming to import data from Flume to Kudu. I have multiples Flume Sinks conected to Spark, but it only work when I set only 1 flume connection (FlumeUtils) otherwise the stage get frozen.

 

Frozen Stage example

[Stage 29:>                                                         (0 + 0) / 2]

 

This is the detail of a frozen stage. It 

org.apache.spark.SparkContext.submitJob(SparkContext.scala:1983)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.org$apache$spark$streaming$scheduler$ReceiverTracker$ReceiverTrackerEndpoint$$startReceiver(ReceiverTracker.scala:557)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1$$anonfun$applyOrElse$1.apply(ReceiverTracker.scala:439)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1$$anonfun$applyOrElse$1.apply(ReceiverTracker.scala:435)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:435)
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
akka.actor.ActorCell.invoke(ActorCell.scala:456)

And this is my code:

 

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.StreamingContext._

import org.kududb.client.SessionConfiguration.FlushMode
import org.kududb.client.{PartialRow, KuduClient}
import org.kududb.client.Operation
import org.kududb.client.SessionConfiguration.FlushMode

val ssc = new StreamingContext(sc,Seconds(2))

//val lines = ssc.textFileStream("/user/root/streaming/sepsyslog/cyt")

def streaminglogs(port: Int, tableName: String) : Unit = {
        println("antes flume")
        val lines = FlumeUtils.createPollingStream(ssc, "cloudera-2", port)
        lines.map(cnt => "Received " + cnt + " flume events." ).print()
        val body = lines.map {e => new String(e.event.getBody().array(), "UTF-8")}


        val kuduClient = new KuduClient.KuduClientBuilder("cloudera-4").build()
        //val tableName = "sep_kudu"

        body.foreach(rdd => {
                val table = kuduClient.openTable(tableName)
                val session = kuduClient.newSession()
                session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)


                val rddsize = rdd.count.toInt
                println(rddsize+" "+tableName)

                if(rddsize!=0) {
                        val df = sqlContext.read.json(rdd)
                        //val df = rdd.print
                        //println(df.schema.toString)
                        val schm = df.schema
                        val dfarr = df.collect
                        //println(schm)
                        for(n <- 0 to rddsize -1) {

                                val update = table.newInsert()
                                val row = update.getRow
                                row.addString("primary_key",System.nanoTime.toString)

                                for(i  <- 0 to schm.size-1){
                                        val col = schm.fields(i).name
                                        val valor = dfarr(n).get(i)
                                        if(valor != null){
                                                row.addString(col,valor.toString)
                                                //println(col +" "+ valor)
                                        }
                                }
                                session.apply(update)
                        }
                }
                session.close()
        }
        )
}

//val puertos = List(12345,12346,12347,12348,12349,12350,12351,12352,12353,12354,12355,12356,12357,12358,12359)
//val tableNames = List("cyt_sepsyslog","euroamerica_sepsyslog","itau_sepsyslog","akainix_sepsyslog","akainix_smgsyslog","ccu_smgsyslog","itau_smgsyslog","movistar_bluecoatsyslog","itau_bluecoatsyslog","itau_bluecoat","akainix_monitoreo","ccu_monitoreo","cyt_monitoreo","itau_monitoreo","transbank_monitoreo")

val puertos = List(12345)
val tableNames = List("cyt_sepsyslog")

var j = 0
for(port <- puertos){
        val tableName = tableNames(j)
        streaminglogs(port, tableName)
        j += 1
}

ssc.start()

I don't know what is the problem because it throws no error, and Spark logs don't says nothing.

Please help me

Don't have an account?
Coming from Hortonworks? Activate your account here