Created 10-27-2016 04:30 PM
Spark Scala Code
// Batch val file = sc.textFile("hdfs://isi.xyz.com:8020/user/test/AsRun.txt") val testdataframe = file.map(x => x.split("\\|")) testdataframe.take(5) //Streaming Code import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.hadoop.conf._ import org.apache.hadoop.fs._ object RatingsMatch{ def main(args: Array[String]){ //set app name val sparkConf = new SparkConf().setAppName("RatingsMatch") val conf = new SparkContext(sparkConf) val ssc = new StreamingContext(conf, Seconds(240)) val file = ssc.textFileStream(args(0)) //file.foreachRDD(rdd=>rdd.map(x => x.split("\\|")).foreach(println)) //val myfilemap = file.map(x => x.split(",")) //myfilemap.print() val myfilemap = file.transform(rdd => {rdd.map(x => x.split("\\|"))}) myfilemap.print() //As Run Schema //myfilemap.foreachRDD{rdd => //rdd.foreach.toArray(println) //} ssc.start() ssc.awaitTermination() } }
I am trying to set up a Spark Streaming job. I’ve been able to get the cookie cutter sample word count to run. Now I am trying with our data. I can split and map the text file from Zeppelin or in cli using the batch engine. However, when I do the same for the dstream I get the output (pasted below the code). Any thoughts? I’ve tried a handful of approaches with Streaming using dstream.map, foreachrdd, and dstream.transform. I thought it may have been the regular expression to parse so I tried to change to a “,”. However, I still get the same results.
[Ljava.lang.String;@c080470 [Ljava.lang.String;@1d6b8b9 [Ljava.lang.String;@2876a606 [Ljava.lang.String;@7fe36aa3 [Ljava.lang.String;@3304daab [Ljava.lang.String;@723bf02 [Ljava.lang.String;@1af86f76 [Ljava.lang.String;@7eaab8f8 [Ljava.lang.String;@6b6ee404 [Ljava.lang.String;@71af9dc4
Created 10-27-2016 04:50 PM
Okay, silly mistake
myfilemap.foreachRDD(rdd => if (!rdd.isEmpty()) { rdd.collect().foreach(println) })
Created 10-27-2016 04:50 PM
Okay, silly mistake
myfilemap.foreachRDD(rdd => if (!rdd.isEmpty()) { rdd.collect().foreach(println) })