Support Questions

Find answers, ask questions, and share your expertise

Integration of Spark Streaming with Flume

avatar

Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils

This is my code :

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
    ............
       ssc.start()
    ssc.awaitTermination()
  }
}

and this is pom.xml dependency:

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>2.10.4</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-flume-sink_2.10</artifactId>
	<version>1.5.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>

thanks in advance for your reply !!!

1 ACCEPTED SOLUTION

avatar
Rising Star

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

View solution in original post

4 REPLIES 4

avatar
Rising Star

Add below dependency as well:

 

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.6.1

 

See here for pull based configuration. 

avatar

thanks, but i have an other error with sink machine hostname.

I do  :

FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)

the error is : overloaded method value createPollingStream with alternatives

In the official site : 

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

How can i enter the sink machine hostname ?

avatar
Rising Star

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

avatar

Yes it's working now, thanks for your help.