Created on 01-05-2017 07:09 PM - edited 09-16-2022 03:53 AM
Hi All, @pdvorak
I am using Cloudera 5.9 in a 3 node cluster. I have to stream RSS feed of a news channel to HDFS.
I have a java code to pull RSS feed and have 3 agents, 2 of which have Exec source to listen on the file generated by java code and Avro sink. the last one has Avro Source and hdfs sink. But when I start Flume on all node and the one with Avro source and hdfs sink is giving hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: Callable timed out after 180000 ms on file: hdfs://10.0.10.4:8020/flume/events/FlumeData.1483670786529.tmp error.
I have googled the error, I also increased testAgent.sinks.testSink.hdfs.callTimeout = 180000, as suggested by https://issues.apache.org/jira/browse/FLUME-2429 , by default it is 10000.
I even increased the value of these 2 HDFS properties, dfs.socket.timeout and dfs.datanode.socket.write.timeout to 30000 from default value 3000.
But the error is still there and nothing is being written on hdfs.
My flume.conf on this node is:
agent.sources = avro-collection-source
agent.channels = memoryChannel
agent.sinks = hdfs-sink
# For each one of the sources, the type is defined
agent.sources.avro-collection-source.type = avro
agent.sources.avro-collection-source.bind = 10.0.0.6
agent.sources.avro-collection-source.port = 60000
# The channel can be defined as follows.
agent.sources.avro-collection-source.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = hdfs://10.0.10.4:8020/flume/events
agent.sinks.hdfs-sink.hdfs.callTimeout = 180000
#Specify the channel the sink should use
agent.sinks.hdfs-sink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
Flume,conf on other 2 nodes are
agent.sources = reader
agent.channels = memoryChannel
agent.sinks = avro-forward-sink
# For each one of the sources, the type is defined
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
# The channel can be defined as follows.
agent.sources.reader.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 10.0.0.6
agent.sinks.avro-forward-sink.port = 60000
#Specify the channel the sink should use
agent.sinks.avro-forward-sink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1004
Error log:
17/01/05 20:46:11 INFO node.Application: Starting Sink hdfs-sink
17/01/05 20:46:11 INFO node.Application: Starting Source avro-collection-source
17/01/05 20:46:11 INFO source.AvroSource: Starting Avro source avro-collection-source: { bindAddress: 10.0.0.6, port: 60000 }...
17/01/05 20:46:11 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
17/01/05 20:46:11 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink started
17/01/05 20:46:11 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro-collection-source: Successfully registered new MBean.
17/01/05 20:46:11 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro-collection-source started
17/01/05 20:46:11 INFO source.AvroSource: Avro source avro-collection-source started.
17/01/05 20:46:20 INFO ipc.NettyServer: [id: 0x8ed94161, /10.0.0.5:51797 => /10.0.0.6:60000] OPEN
17/01/05 20:46:20 INFO ipc.NettyServer: [id: 0x8ed94161, /10.0.0.5:51797 => /10.0.0.6:60000] BOUND: /10.0.0.6:60000
17/01/05 20:46:20 INFO ipc.NettyServer: [id: 0x8ed94161, /10.0.0.5:51797 => /10.0.0.6:60000] CONNECTED: /10.0.0.5:51797
17/01/05 20:46:26 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
17/01/05 20:46:27 INFO hdfs.BucketWriter: Creating hdfs://10.0.10.4:8020/flume/events/FlumeData.1483670786526.tmp
17/01/05 20:46:49 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 0 time(s); maxRetries=45
17/01/05 20:47:09 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 1 time(s); maxRetries=45
17/01/05 20:47:29 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 2 time(s); maxRetries=45
17/01/05 20:47:49 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 3 time(s); maxRetries=45
17/01/05 20:48:09 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 4 time(s); maxRetries=45
17/01/05 20:48:29 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 5 time(s); maxRetries=45
17/01/05 20:48:49 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 6 time(s); maxRetries=45
17/01/05 20:49:09 INFO ipc.Client: Retrying connect to server: 10.0.10.4/10.0.10.4:8020. Already tried 7 time(s); maxRetries=45
17/01/05 20:49:27 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: Callable timed out after 180000 ms on file: hdfs://10.0.10.4:8020/flume/events/FlumeData.1483670786526.tmp
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:693)
at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:514)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:686)
... 6 more
Can anyone help me to resolve this, i have no idea why this is hapenning.
Thanks,
Shilpa
Created 01-06-2017 03:49 PM
Hi @pdvorak,
Thanks for your comments.
The answer to all three questions are Yes. My iptables is turned off. I can ping my NN and tranverse hdfs.
The problem was, the IP for hdfs sink I gave was private IP and once I changed it to public. It started streaming the data.
So, the issue is resolved. 🙂
Created 01-06-2017 12:11 PM
Created 01-06-2017 03:49 PM
Hi @pdvorak,
Thanks for your comments.
The answer to all three questions are Yes. My iptables is turned off. I can ping my NN and tranverse hdfs.
The problem was, the IP for hdfs sink I gave was private IP and once I changed it to public. It started streaming the data.
So, the issue is resolved. 🙂