Created 10-27-2016 04:30 PM
Spark Scala Code
// Batch
val file = sc.textFile("hdfs://isi.xyz.com:8020/user/test/AsRun.txt")
val testdataframe = file.map(x => x.split("\\|"))
testdataframe.take(5)
//Streaming Code
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
object RatingsMatch{
def main(args: Array[String]){
//set app name
val sparkConf = new SparkConf().setAppName("RatingsMatch")
val conf = new SparkContext(sparkConf)
val ssc = new StreamingContext(conf, Seconds(240))
val file = ssc.textFileStream(args(0))
//file.foreachRDD(rdd=>rdd.map(x => x.split("\\|")).foreach(println))
//val myfilemap = file.map(x => x.split(","))
//myfilemap.print()
val myfilemap = file.transform(rdd => {rdd.map(x => x.split("\\|"))})
myfilemap.print()
//As Run Schema
//myfilemap.foreachRDD{rdd =>
//rdd.foreach.toArray(println)
//}
ssc.start()
ssc.awaitTermination()
}
}
I am trying to set up a Spark Streaming job. I’ve been able to get the cookie cutter sample word count to run. Now I am trying with our data. I can split and map the text file from Zeppelin or in cli using the batch engine. However, when I do the same for the dstream I get the output (pasted below the code). Any thoughts? I’ve tried a handful of approaches with Streaming using dstream.map, foreachrdd, and dstream.transform. I thought it may have been the regular expression to parse so I tried to change to a “,”. However, I still get the same results.
[Ljava.lang.String;@c080470 [Ljava.lang.String;@1d6b8b9 [Ljava.lang.String;@2876a606 [Ljava.lang.String;@7fe36aa3 [Ljava.lang.String;@3304daab [Ljava.lang.String;@723bf02 [Ljava.lang.String;@1af86f76 [Ljava.lang.String;@7eaab8f8 [Ljava.lang.String;@6b6ee404 [Ljava.lang.String;@71af9dc4
Created 10-27-2016 04:50 PM
Okay, silly mistake
myfilemap.foreachRDD(rdd => if (!rdd.isEmpty()) {
rdd.collect().foreach(println)
})
Created 10-27-2016 04:50 PM
Okay, silly mistake
myfilemap.foreachRDD(rdd => if (!rdd.isEmpty()) {
rdd.collect().foreach(println)
})