Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Integration of Spark Streaming with Flume

avatar
Contributor

Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils

This is my code :

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
    ............
       ssc.start()
    ssc.awaitTermination()
  }
}

and this is pom.xml dependency:

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>2.10.4</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-flume-sink_2.10</artifactId>
	<version>1.5.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>

thanks in advance for your reply !!!

1 ACCEPTED SOLUTION

avatar
Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

View solution in original post

4 REPLIES 4

avatar
Contributor

Add below dependency as well:

 

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.6.1

 

See here for pull based configuration. 

avatar
Contributor

thanks, but i have an other error with sink machine hostname.

I do  :

FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)

the error is : overloaded method value createPollingStream with alternatives

In the official site : 

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

How can i enter the sink machine hostname ?

avatar
Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

avatar
Contributor

Yes it's working now, thanks for your help.

 

Labels