Member since
03-08-2016
33
Posts
0
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
14408 | 04-25-2016 05:45 AM | |
23487 | 04-07-2016 04:13 AM |
05-11-2016
03:56 AM
Yes it's working now, thanks for your help.
... View more
05-11-2016
03:15 AM
thanks, but i have an other error with sink machine hostname. I do : FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) the error is : overloaded method value createPollingStream with alternatives In the official site : val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) How can i enter the sink machine hostname ?
... View more
05-11-2016
01:20 AM
Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils This is my code : import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
............
ssc.start()
ssc.awaitTermination()
}
} and this is pom.xml dependency: <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency> thanks in advance for your reply !!!
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Spark
04-25-2016
05:45 AM
I have found the solution : var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD
... View more
04-25-2016
04:53 AM
I want to make a union for which RDD who I have in streaming this is my code : val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.20:8020/user/sparkStreaming/input")
var test = file.map(x => (x.split(";")(0)+";"+x.split(";")(1), 1)).reduceByKey((x,y) => x+y)
var addedRDD = sc.emptyRDD
test.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
} but I have this error : type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.RDD[Nothing] And when I try to create an empty RDD with a given type, I have this error : type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.EmptyRDD[(String, Int)] How can I fix this problem? thanks in advance !!!
... View more
Labels:
- Labels:
-
Apache Spark
-
HDFS
04-07-2016
04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1. here is the complete code : def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val file = ssc.textFileStream("/root/file/test/file")
file.foreachRDD(t=> {
val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFile("/root/file/file1")
})
ssc.start()
ssc.awaitTermination()
}
... View more
04-07-2016
03:34 AM
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
at com.org.file.filecount.FileCount.main(FileCount.scala) there's a mismatch in the versions of dependencies and runtime so i do : <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency> And i am getting error like as the following : 16/04/07 11:23:56 WARN FileInputDStream: Error finding new files
java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"
... View more
04-06-2016
06:20 AM
I try it, and I get : 16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
at com.org.file.filecount.FileCount.main(FileCount.scala)
... View more
04-06-2016
04:59 AM
this is my code : import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerStageCompleted
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("/root/file/test/f3")
file.foreachRDD(t=> {
val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFile("/root/file/file1")
})
sc.stop()
}
}
... View more