Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Integration of Spark Streaming with Flume

Contributor

Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils

This is my code :

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
    ............
       ssc.start()
    ssc.awaitTermination()
  }
}

and this is pom.xml dependency:

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>2.10.4</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-flume-sink_2.10</artifactId>
	<version>1.5.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>

thanks in advance for your reply !!!

1 ACCEPTED SOLUTION

Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

View solution in original post

4 REPLIES 4

Contributor

Add below dependency as well:

 

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.6.1

 

See here for pull based configuration. 

Contributor

thanks, but i have an other error with sink machine hostname.

I do  :

FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)

the error is : overloaded method value createPollingStream with alternatives

In the official site : 

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

How can i enter the sink machine hostname ?

Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

Contributor

Yes it's working now, thanks for your help.

 

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.