Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

While running flume agent facing some error

avatar

Hi, Guys

while running my flume agents getting some error

Here My error:
Avro source source1: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:386)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91)
at org.apache.avro.ipc.Responder.respond(Responder.java:151)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:787)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:555)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 30 more

and here my config files

my local config:

agent.sources = localsource
agent.channels = memoryChannel
agent.sinks = avro_Sink

agent.sources.localsource.type = spooldir
#agent.sources.localsource.shell = /bin/bash -c
agent.sources.localsource.spoolDir = /home/dwh/teja/Flumedata/
agent.sources.localsource.fileHeader = true

# The channel can be defined as follows.
agent.sources.localsource.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.avro_Sink.type = avro
agent.sinks.avro_Sink.hostname=192.168.4.110
agent.sinks.avro_Sink.port= 8021

agent.sinks.avro_Sink.avro.batchSize = 100
agent.sinks.avro_Sink.avro.rollCount = 0
agent.sinks.avro_Sink.avro.rollSize = 73060831
agent.sinks.avro_Sink.avro.rollInterval = 0

agent.sources.localsource.interceptors = search-replace
agent.sources.localsource.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
agent.sources.localsource.interceptors.search-replace.searchPattern = ###|##
agent.sources.localsource.interceptors.search-replace.replaceString = |

#Specify the channel the sink should use
agent.sinks.avro_Sink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000


my server X config file

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = avro
tier1.sources.source1.bind= 192.168.4.110
tier1.sources.source1.port= 8021
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type= memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat= Text
tier1.sinks.sink1.hdfs.batchSize = 100
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 73060831
tier1.sinks.sink1.hdfs.rollInterval = 0

tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

 

Please help if any one familiar with this.

 

Thanks in advance.

1 ACCEPTED SOLUTION

avatar

Hi, Guys

Its working fine. I changed ip address in sink path it's writting now.

 

i changed hdfs://192.168.4.110:8020/user/hadoop/flumelogs/  this ip is data node ip and i changed to master node ip

 

hdfs://192.168.4.112:8020/user/hadoop/flumelogs/  so it working fine, as my thinking flume can't right directly to data node.

 

View solution in original post

9 REPLIES 9

avatar

As noted in your other similar post, one of two thing happened:

 

1.  Your single sink is not keeping up with the source, you need to add more sinks to pull from the same channel

 

or

 

2.  You had an error in your sink that caused it to stop delivering to hdfs.  You should review the logs for the first error (prior to the channel full exceptions).  Often times restarting will resolve the issue.  If you add additional sinks, that will help as well, because failure of one sink won't prevent the other sinks from pulling events off the channel.

 

-pd

avatar
Hi, Thanks a lot for your reply

As You said i want to add more sinks for that channel is that possible if so how? and my problem is i want to write all logs in to one file in hdfs so if i use multiple sinks is that possible to write all logs into one single file?

Thanks in advance

avatar

Hi,

 

    As you said i checked log first error

 

     This is that error

 

Avro source source1: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:386)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91)
at org.apache.avro.ipc.Responder.respond(Responder.java:151)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:787)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:555)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 30 more

 

This  the full error and i checked that charts that channel reaching 100 % so what to do now for clearing this.

 

 

Thanks in advance.

 

avatar
Add 3 more hdfs sinks, all using the same channel. Be sure to add hdfs.filePrefix with a unique value per sink to avoid filename collision. Hopefully that will deliver the events fast enough to keep up.

-pd

avatar
Hi, Thanks a lot for your reply

As you said i want to define two more sinks like sink 1 and sink 2 and sink 3

for example this is my config file for sink:

# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1


tier1.sources.source1.type = avro
tier1.sources.source1.bind= 192.168.4.110
tier1.sources.source1.port= 8021
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type= memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat= Text
tier1.sinks.sink1.hdfs.batchSize = 100
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 73060835
tier1.sinks.sink1.hdfs.rollInterval = 0

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

so i want to add two more like this

tier1.sinks.sink2.type = hdfs
tier1.sinks.sink2.channel = channel1
tier1.sinks.sink2.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink2.hdfs.fileType = DataStream
tier1.sinks.sink2.hdfs.writeFormat= Text
tier1.sinks.sink2.hdfs.batchSize = 100
tier1.sinks.sink2.hdfs.rollCount = 0
tier1.sinks.sink2.hdfs.rollSize = 73060835
tier1.sinks.sink2.hdfs.rollInterval = 0

tier1.sinks.sink3.type = hdfs
tier1.sinks.sink3.channel = channel1
tier1.sinks.sink3.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink3.hdfs.fileType = DataStream
tier1.sinks.sink3.hdfs.writeFormat= Text
tier1.sinks.sink3.hdfs.batchSize = 100
tier1.sinks.sink3.hdfs.rollCount = 0
tier1.sinks.sink3.hdfs.rollSize = 73060835
tier1.sinks.sink3.hdfs.rollInterval = 0

Thanks in advance.

avatar

Hi , As you said i tried like this

# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1 sink2 sink3


tier1.sources.source1.type = avro
tier1.sources.source1.bind= 192.168.4.110
tier1.sources.source1.port= 8021
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type= memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink1.hdfs.filePrefix = Flumedata
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat= Text
tier1.sinks.sink1.hdfs.batchSize = 100
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 73060835
tier1.sinks.sink1.hdfs.rollInterval = 0

tier1.sinks.sink2.type = hdfs
tier1.sinks.sink2.channel = channel1
tier1.sinks.sink2.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink2.hdfs.filePrefix = Flumedata1
tier1.sinks.sink2.hdfs.fileType = DataStream
tier1.sinks.sink2.hdfs.writeFormat= Text
tier1.sinks.sink2.hdfs.batchSize = 100
tier1.sinks.sink2.hdfs.rollCount = 0
tier1.sinks.sink2.hdfs.rollSize = 73060835
tier1.sinks.sink2.hdfs.rollInterval = 0

tier1.sinks.sink3.type = hdfs
tier1.sinks.sink3.channel = channel1
tier1.sinks.sink3.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/
tier1.sinks.sink3.hdfs.filePrefix = Flumedata2
tier1.sinks.sink3.hdfs.fileType = DataStream
tier1.sinks.sink3.hdfs.writeFormat= Text
tier1.sinks.sink3.hdfs.batchSize = 100
tier1.sinks.sink3.hdfs.rollCount = 0
tier1.sinks.sink3.hdfs.rollSize = 73060835
tier1.sinks.sink3.hdfs.rollInterval = 0

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

Still facing same error.

org.apache.flume.node.PollingPropertiesFileConfigurationProvider

Configuration provider starting

org.apache.flume.node.PollingPropertiesFileConfigurationProvider

Reloading configuration file:/data/var/run/cloudera-scm-agent/process/1660-flume-AGENT/flume.conf

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfiguration

Added sinks: sink1 sink2 sink3 Agent: tier1

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink3

org.apache.flume.conf.FlumeConfigurationProcessing:sink2

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfigurationProcessing:sink1

org.apache.flume.conf.FlumeConfiguration

Post-validation flume configuration contains configuration for agents: [tier1]

org.apache.flume.node.AbstractConfigurationProviderCreating channels

org.apache.flume.channel.DefaultChannelFactoryCreating instance of channel channel1 type memory

org.apache.flume.node.AbstractConfigurationProviderCreated channel channel1

org.apache.flume.source.DefaultSourceFactoryCreating instance of source source1, type avro

org.apache.flume.sink.DefaultSinkFactoryCreating instance of sink: sink1, type: hdfs

org.apache.flume.sink.DefaultSinkFactoryCreating instance of sink: sink2, type: hdfs

org.apache.flume.sink.DefaultSinkFactoryCreating instance of sink: sink3, type: hdfs

org.apache.flume.node.AbstractConfigurationProvideChannel channel1 connected to [source1, sink1, sink2, sink3]

org.apache.flume.node.Application

Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Avro source source1: { bindAddress: 192.168.4.110, port: 8021 } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@672f151d counterGroup:{ name:null counters:{} } }, sink2=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@441357d7 counterGroup:{ name:null counters:{} } }, sink3=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@51ec072b counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }

org.apache.flume.node.ApplicationStarting Channel channel1

org.apache.flume.instrumentation.MonitoredCounterGroupMonitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.

org.apache.flume.instrumentation.MonitoredCounterGroup Component type: CHANNEL, name: channel1 started

org.apache.flume.node.ApplicationStarting Sink sink1

org.apache.flume.node.ApplicationStarting Sink sink2

org.apache.flume.node.Application

Starting Sink sink3

org.apache.flume.node.Application

Starting Source source1

org.apache.flume.source.AvroSource

Starting Avro source source1: { bindAddress: 192.168.4.110, port: 8021 }...

org.apache.flume.instrumentation.MonitoredCounterGroup

Monitored counter group for type: SINK, name: sink2: Successfully registered new MBean.

org.apache.flume.instrumentation.MonitoredCounterGroup

Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.

org.apache.flume.instrumentation.MonitoredCounterGroup

Component type: SINK, name: sink1 started

org.apache.flume.instrumentation.MonitoredCounterGroup

Component type: SINK, name: sink2 started

org.apache.flume.instrumentation.MonitoredCounterGroup

Monitored counter group for type: SINK, name: sink3: Successfully registered new MBean.

org.apache.flume.instrumentation.MonitoredCounterGroup

Component type: SINK, name: sink3 started

org.mortbay.log

Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog

org.mortbay.log

jetty-6.1.26.cloudera.4

org.mortbay.log

Started SelectChannelConnector@0.0.0.0:41414

org.apache.flume.instrumentation.MonitoredCounterGroup

Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.

org.apache.flume.instrumentation.MonitoredCounterGroup

Component type: SOURCE, name: source1 started

org.apache.flume.source.AvroSource

Avro source source1 started.

org.apache.avro.ipc.NettyServer

[id: 0x077365c8, /192.168.6.118:60703 => /192.168.4.110:8021] OPEN

org.apache.avro.ipc.NettyServer [id: 0x077365c8, /192.168.6.118:60703 => /192.168.4.110:8021] BOUND: /192.168.4.110:8021

org.apache.avro.ipc.NettyServer [id: 0x077365c8, /192.168.6.118:60703 => /192.168.4.110:8021] CONNECTED: /192.168.6.118:60703

org.apache.flume.sink.hdfs.HDFSDataStreamSerializer = TEXT, UseRawLocalFileSystem = false

org.apache.flume.sink.hdfs.HDFSDataStreamSerializer = TEXT, UseRawLocalFileSystem = false

org.apache.flume.sink.hdfs.HDFSDataStreamSerializer = TEXT, UseRawLocalFileSystem = false

org.apache.flume.sink.hdfs.BucketWriter Creating hdfs://192.168.4.110:8021/user/hadoop/flumelogs//Flumedata.1463466791008.tmporg.apache.avro.ipc.NettyServer[id: 0x7a41116a, /192.168.4.110:38630 => /192.168.4.110:8021] OPEN

org.apache.avro.ipc.NettyServer [id: 0x7a41116a, /192.168.4.110:38630 => /192.168.4.110:8021] BOUND: /192.168.4.110:8021

org.apache.avro.ipc.NettyServer [id: 0x7a41116a, /192.168.4.110:38630 => /192.168.4.110:8021] CONNECTED: /192.168.4.110:38630

org.apache.avro.ipc.NettyServer [id: 0x7a41116a, /192.168.4.110:38630 😆 /192.168.4.110:8021] DISCONNECTED

org.apache.avro.ipc.NettyServer

Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 134352896 items! Connection closed.
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:482)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:365)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:396)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:336)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:81)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.Channels.close(Channels.java:812)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:166)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

org.apache.flume.sink.hdfs.HDFSEventSink

HDFS IO error
java.io.EOFException: End of File Exception between local host is: "HadoopF02.hadoopslave1.com/192.168.4.110"; destination host is: "HadoopF02.hadoopslave1.com":8021; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1472)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at $Proxy21.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at $Proxy22.create(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1738)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1662)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1587)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
at org.apache.flume.sink.hdfs.HDFSDataStream.doOpen(HDFSDataStream.java:86)
at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:113)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:246)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1071)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:966)

org.apache.flume.sink.hdfs.BucketWriter Creating hdfs://192.168.4.110:8021/user/hadoop/flumelogs//Flumedata1.1463466791008.tmp

org.apache.avro.ipc.NettyServer [id: 0x7a41116a, /192.168.4.110:38630 😆 /192.168.4.110:8021] UNBOUND

org.apache.avro.ipc.NettyServer [id: 0x7a41116a, /192.168.4.110:38630 😆 /192.168.4.110:8021] CLOSED

org.apache.avro.ipc.NettyServer
Connection to /192.168.4.110:38630 disconnected.

org.apache.avro.ipc.NettyServer

Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 150994944 items! Connection closed.
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

it continuing like this .

 

And this flume agent ruuning on data node is that any problem.

Please help thanks in advance.

avatar

This error:

org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 150994944 items! Connection closed.

 

Is usually caused when something upstream is trying to send non-avro data to the avro source.  

 

In your source config, you are specifying the avro source with the same port as the hdfs namenode port:

 

tier1.sources.source1.type = avro
tier1.sources.source1.bind= 192.168.4.110
tier1.sources.source1.port= 8021
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type= memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://192.168.4.110:8021/user/hadoop/flumelogs/

 

I believe that will cause issues in your configuration, as the sink will try to connect to the avro source port as it thinks thats the namenode port.  If your namenode port is indeed 8021, then you need to change your avro source port to be something different.

 

-pd

avatar

Hi, I tried as you suggested

i changed port num in hdfs path as
hdfs://192.168.4.110:8020/user/hadoop/flumelogs/
but facing same issue.
I'm thinking about flume sink is not writing fast due to network issue or less bandwidth is that correct?

 

and i have one more dout  i installed flume on data node is that causing any problem?


Thanks in advance.

avatar

Hi, Guys

Its working fine. I changed ip address in sink path it's writting now.

 

i changed hdfs://192.168.4.110:8020/user/hadoop/flumelogs/  this ip is data node ip and i changed to master node ip

 

hdfs://192.168.4.112:8020/user/hadoop/flumelogs/  so it working fine, as my thinking flume can't right directly to data node.