Created on 05-11-2016 01:20 AM - edited 09-16-2022 03:18 AM
Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils
This is my code :
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume._ import org.apache.spark.SparkContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.{ Seconds, StreamingContext } object WordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("File Count") .setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils ............ ssc.start() ssc.awaitTermination() } }
and this is pom.xml dependency:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency>
thanks in advance for your reply !!!
Created 05-11-2016 03:47 AM
You are messing with createPollingStream method.
Give 198.168.1.31 as sink address as below and it should work.
FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)
Created 05-11-2016 01:52 AM
Add below dependency as well:
groupId = org.apache.spark artifactId = spark-streaming-flume_2.10 version = 1.6.1
See here for pull based configuration.
Created 05-11-2016 03:15 AM
thanks, but i have an other error with sink machine hostname.
I do :
FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)
the error is : overloaded method value createPollingStream with alternatives
In the official site :
val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
How can i enter the sink machine hostname ?
Created 05-11-2016 03:47 AM
You are messing with createPollingStream method.
Give 198.168.1.31 as sink address as below and it should work.
FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)
Created 05-11-2016 03:56 AM
Yes it's working now, thanks for your help.