Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: The Cloudera Community will undergo maintenance on Saturday, August 17 at 12:00am PDT. See more info here.

spark streaming error while reciving data from flume

Highlighted

spark streaming error while reciving data from flume

I am using spark streaming , event count example , flume as source. 

this is my config file :

FileFileAgent.sources = File 
FileFileAgent.channels = MemChannel 
FileFileAgent.sinks = spark
#configuring the souce FileFileAgent.sources.File.type = spooldir FileFileAgent.sources.File.spoolDir = /usr/lib/flume/spooldir #FileFileAgent.sources.File.fileHeader = true # Describing/Configuring the sink FileAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink FileAgent.sinks.spark.hostname = 192.168.1.31 FileAgent.sinks.spark.port = 18088 FileAgent.sinks.spark.channel = MemoryChannel
# Describing/Configuring the channel FileFileAgent.channels.MemChannel.type = memory FileFileAgent.channels.MemChannel.capacity = 10000 FileFileAgent.channels.MemChannel.transactionCapacity = 100 # Binding the source and sink to the channel FileFileAgent.sources.File.channels = MemChannel

when I run the command line :

 bin/flume-ng agent --conf conf --conf-file flumeSpark.conf --name FileAgent -Dflume.root.logger=INFO,console

I have a warning :

16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration for 'FileAgent' does not contain any channels. Marking it as invalid.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration invalid for agent 'FileAgent'. It will be removed.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: no context for sinkspark
16/05/12 10:44:52 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [FileFileAgent]
16/05/12 10:44:52 WARN node.AbstractConfigurationProvider: No configuration found for this host:FileAgent
16/05/12 10:44:52 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

And when i execute my spark application :

    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc, "192.168.1.31", 18088)
  
    flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()

I have this warning :

16/05/12 10:46:59 WARN FlumeBatchFetcher: Error while receiving data from Flume
java.io.IOException: NettyTransceiver closed
	at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:363)
	at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:61)
	at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:580)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
	at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
	at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
	at org.jboss.netty.channel.Channels.close(Channels.java:812)
	at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
	at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:166)
	at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Any help please . how can i execute my application, i don't know how to resolve this problem?