Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Integration of Spark Streaming with Flume

avatar

Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils

This is my code :

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
    ............
       ssc.start()
    ssc.awaitTermination()
  }
}

and this is pom.xml dependency:

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>2.10.4</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-flume-sink_2.10</artifactId>
	<version>1.5.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>

thanks in advance for your reply !!!

1 ACCEPTED SOLUTION

avatar
Rising Star

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

View solution in original post

4 REPLIES 4

avatar
Rising Star

Add below dependency as well:

 

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.6.1

 

See here for pull based configuration. 

avatar

thanks, but i have an other error with sink machine hostname.

I do  :

FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)

the error is : overloaded method value createPollingStream with alternatives

In the official site : 

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

How can i enter the sink machine hostname ?

avatar
Rising Star

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

avatar

Yes it's working now, thanks for your help.