Reply
Highlighted
umb
Explorer
Posts: 8
Registered: ‎11-07-2014

spark-submit for spark streaming-flume application not working

I am looking forward to ingest data to spark-streaming from flume.

 

Here is my flume conf with 2 sinks( one for HDFS, one for avro, which i will use from spark)

##TARGET AGENT ##
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector
 
#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2
 
## Channels ##
## Source writes to 2 channels, one for each sink
collector.channels = mc1 mc2
 
#http://flume.apache.org/FlumeUserGuide.html#memory-channel
 
collector.channels.mc1.type = memory
collector.channels.mc1.cpacity = 100

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100
 
## Sinks ##
collector.sinks = speed batch 

## Send To AvroSink for Speed Layer
collector.sinks.speed.type = avro
collector.sinks.speed.channel = mc1
collector.sinks.speed.hostname = localhost
collector.sinks.speed.port = 45451
 
## Write to HDFS : Batch Layer
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.batch.type = hdfs
collector.sinks.batch.channel = mc2
collector.sinks.batch.hdfs.path = /user/root/flume-channel/%{log_type}/%y%m%d
collector.sinks.batch.hdfs.fileType = DataStream
collector.sinks.batch.hdfs.writeFormat = Text
collector.sinks.batch.hdfs.rollSize = 0
collector.sinks.batch.hdfs.rollCount = 10000
collector.sinks.batch.hdfs.rollInterval = 600

 I wrote a flume event count based on examples of spark-streaming:

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel

object FlumeWC {
  def main(args: Array[String]) {

   //StreamingExamples.setStreamingLogLevels()
   val batchIntervals = Milliseconds(2000)
   val sparkConf = new SparkConf().setAppName("Flume Word Count")    
   val ssc = new StreamingContext(sparkConf, batchIntervals)

   val stream = FlumeUtils.createPollingStream(ssc, "localhost", 45451, StorageLevel.MEMORY_ONLY_SER_2)
   val result = stream.count().map(cnt => "Received " + cnt + " flume events. ")
   print("SomeTEXT")
   result.print()
   
   ssc.start
   ssc.awaitTermination
 }
}

 Here is my sbt build file:

name := "FlumeEventCount"

organization := "LogPoint"

version := "0.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-streaming" % "1.1.0" % "provided",
   "org.apache.spark" %% "spark-streaming-flume" % "1.1.0" 
exclude("org.apache.spark","spark-streaming_2.10")
)

assemblyMergeStrategy in assembly := {
    case PathList(ps @ _*) if ps.last endsWith ".RSA" => MergeStrategy.first 
    case PathList("javax", "servlet","log","filter", xs @ _*)         => MergeStrategy.first
    case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
    case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
    case x if x.startsWith("META-INF/MANIFEST.MF") => MergeStrategy.discard
    case x if x.startsWith("plugin.properties") => MergeStrategy.last
    case x => MergeStrategy.first
}

 When i build the source and package them into jar , and then do spark-submit, nothing is happening.
I have tried following two commands for spark-submit, i have also included the output:

 

spark-submit --class FlumeWC --master yarn-cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 target/scala-2.10/FlumeEventCount-assembly-0.1.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/05 08:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

vagrant@vm-cluster-node1:~/FlumeWC$ spark-submit --class FlumeWC --master yarn-cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 target/scala-2.10/FlumeEventCount-assembly-0.1.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/12/05 08:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

 

vagrant@vm-cluster-node1:~/FlumeWC$ spark-submit --class FlumeWC --master spark://localhost:7077 --driver-memory 512m --executor-memory 256m --executor-cores 1 target/scala-2.10/FlumeEventCount-assembly-0.1.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/05 08:02:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/12/05 08:02:55 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@localhost:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@localhost:7077]
14/12/05 08:02:55 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@localhost:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@localhost:7077]
14/12/05 08:02:55 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@localhost:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@localhost:7077]
14/12/05 08:02:55 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@localhost:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@localhost:7077]
SomeTEXT
14/12/05 08:03:10 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

 I am stuck here, what am i doing wrong, is it with flume conf file or i with my scala code? I have used approach 1 for spark flume integration from here https://spark.apache.org/docs/1.1.0/streaming-flume-integration.html.

Contributor
Posts: 56
Registered: ‎02-09-2015

Re: spark-submit for spark streaming-flume application not working

you found a solution for this problem ?

Announcements