Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

spark streaming error while reciving data from flume

spark streaming error while reciving data from flume

Contributor

I am using spark streaming , event count example , flume as source. 

this is my config file :

FileFileAgent.sources = File 
FileFileAgent.channels = MemChannel 
FileFileAgent.sinks = spark
#configuring the souce FileFileAgent.sources.File.type = spooldir FileFileAgent.sources.File.spoolDir = /usr/lib/flume/spooldir #FileFileAgent.sources.File.fileHeader = true # Describing/Configuring the sink FileAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink FileAgent.sinks.spark.hostname = 192.168.1.31 FileAgent.sinks.spark.port = 18088 FileAgent.sinks.spark.channel = MemoryChannel
# Describing/Configuring the channel FileFileAgent.channels.MemChannel.type = memory FileFileAgent.channels.MemChannel.capacity = 10000 FileFileAgent.channels.MemChannel.transactionCapacity = 100 # Binding the source and sink to the channel FileFileAgent.sources.File.channels = MemChannel

when I run the command line :

 bin/flume-ng agent --conf conf --conf-file flumeSpark.conf --name FileAgent -Dflume.root.logger=INFO,console

I have a warning :

16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration for 'FileAgent' does not contain any channels. Marking it as invalid.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration invalid for agent 'FileAgent'. It will be removed.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: no context for sinkspark
16/05/12 10:44:52 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [FileFileAgent]
16/05/12 10:44:52 WARN node.AbstractConfigurationProvider: No configuration found for this host:FileAgent
16/05/12 10:44:52 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

And when i execute my spark application :

    val conf = new SparkConf()
      .setAppName("File Count")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    
    val ssc = new StreamingContext(sc, Seconds(10))
    val flumeStream = FlumeUtils.createPollingStream(ssc, "192.168.1.31", 18088)
  
    flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()

I have this warning :

16/05/12 10:46:59 WARN FlumeBatchFetcher: Error while receiving data from Flume
java.io.IOException: NettyTransceiver closed
	at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:363)
	at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:61)
	at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:580)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
	at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
	at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
	at org.jboss.netty.channel.Channels.close(Channels.java:812)
	at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
	at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:166)
	at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Any help please . how can i execute my application, i don't know how to resolve this problem?