Created on 05-11-2016 01:20 AM - edited 09-16-2022 03:18 AM
Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils
This is my code :
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
............
ssc.start()
ssc.awaitTermination()
}
}and this is pom.xml dependency:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>thanks in advance for your reply !!!
Created 05-11-2016 03:47 AM
You are messing with createPollingStream method.
Give 198.168.1.31 as sink address as below and it should work.
FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)
Created 05-11-2016 01:52 AM
Add below dependency as well:
groupId = org.apache.spark artifactId = spark-streaming-flume_2.10 version = 1.6.1
See here for pull based configuration.
Created 05-11-2016 03:15 AM
thanks, but i have an other error with sink machine hostname.
I do :
FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)
the error is : overloaded method value createPollingStream with alternatives
In the official site :
val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
How can i enter the sink machine hostname ?
Created 05-11-2016 03:47 AM
You are messing with createPollingStream method.
Give 198.168.1.31 as sink address as below and it should work.
FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)
Created 05-11-2016 03:56 AM
Yes it's working now, thanks for your help.