Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Not able to view the contents of the RDD in spark Streaming

Hi,

Below is my sample code: and it is not printing any contents in RDD.Appriciate your help

object MyTest { def main( args: Array[String] ) {

println("Before Spark Streaming context") val ssc = new StreamingContext("local[*]", "MyTest", Seconds(30)) println("Compiled successully and ran!!") // val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667") // val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667","zookeeper.connect" -> "127.0.0.1:2181") val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667","zookeeper.connect" -> "127.0.0.1:2181", "group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000") println("The value paseed to kafka: " + kafkaParams) val topics = List("testlal6").toSet

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) println("The value coming as line: " + lines) val alarmMsgs = lines.filter( s => s.contains("Received alarm from mediation service")) println("The value coming as alarms: " + alarmMsgs) val regexPattern = new Regex("date[a-zA-Z0-9;=\\-\\s:\\._]*") val actualAlarms = alarmMsgs.map( alarm => regexPattern findFirstIn alarm get) println(" alarms: " + actualAlarms) actualAlarms.foreachRDD((rdd, time) => {

rdd.foreach(println) val split = rdd.flatMap(line => line.split(";"))

println("-------------------------------------------------------------------------------") rdd.saveAsTextFile("sample.txt") split.collect().flatten.foreach(println)

println("===============================================================================")

})

// ssc.checkpoint("/data05/vpns_poc/chkpoint") ssc.start() ssc.awaitTermination()

} }

1 REPLY 1

New Contributor

StreamingContext is being sent the parameter "local[*]" which is not going to work. Try using local[2] or > 2. I had the same problem early on in my Spark Streaming work.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.