Hi,
Below is my sample code: and it is not printing any contents in RDD.Appriciate your help
object MyTest {
def main( args: Array[String] ) {
println("Before Spark Streaming context")
val ssc = new StreamingContext("local[*]", "MyTest", Seconds(30))
println("Compiled successully and ran!!")
// val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667")
// val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667","zookeeper.connect" -> "127.0.0.1:2181")
val kafkaParams = Map[String,String]("metadata.broker.list" -> "ey9omprna006.vzbi.com:6667","zookeeper.connect" -> "127.0.0.1:2181",
"group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000")
println("The value paseed to kafka: " + kafkaParams)
val topics = List("testlal6").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
println("The value coming as line: " + lines)
val alarmMsgs = lines.filter( s => s.contains("Received alarm from mediation service"))
println("The value coming as alarms: " + alarmMsgs)
val regexPattern = new Regex("date[a-zA-Z0-9;=\\-\\s:\\._]*")
val actualAlarms = alarmMsgs.map( alarm => regexPattern findFirstIn alarm get)
println(" alarms: " + actualAlarms)
actualAlarms.foreachRDD((rdd, time) => {
rdd.foreach(println)
val split = rdd.flatMap(line => line.split(";"))
println("-------------------------------------------------------------------------------")
rdd.saveAsTextFile("sample.txt")
split.collect().flatten.foreach(println)
println("===============================================================================")
})
// ssc.checkpoint("/data05/vpns_poc/chkpoint")
ssc.start()
ssc.awaitTermination()
}
}