Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Integration of Spark Streaming with Flume

SOLVED Go to solution

Integration of Spark Streaming with Flume

Contributor

Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils

This is my code :

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
    ............
       ssc.start()
    ssc.awaitTermination()
  }
}

and this is pom.xml dependency:

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>2.10.4</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.10</artifactId>
	<version>1.6.1</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-flume-sink_2.10</artifactId>
	<version>1.5.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>

thanks in advance for your reply !!!

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Integration of Spark Streaming with Flume

Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

4 REPLIES 4

Re: Integration of Spark Streaming with Flume

Contributor

Add below dependency as well:

 

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.6.1

 

See here for pull based configuration. 

Re: Integration of Spark Streaming with Flume

Contributor

thanks, but i have an other error with sink machine hostname.

I do  :

FlumeUtils.createPollingStream(ssc,198.168.1.31,8020)

the error is : overloaded method value createPollingStream with alternatives

In the official site : 

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

How can i enter the sink machine hostname ?

Re: Integration of Spark Streaming with Flume

Contributor

You are messing with createPollingStream method.

 

Give 198.168.1.31 as sink address as below and it should work.

 

FlumeUtils.createPollingStream(ssc,"198.168.1.31",8020)

 

Highlighted

Re: Integration of Spark Streaming with Flume

Contributor

Yes it's working now, thanks for your help.