Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume 1.4.0 - Continuous loop writing to hdfs

Flume 1.4.0 - Continuous loop writing to hdfs

New Contributor

Hi all,

we have a problem using flume-ng 1.4.0 (CDH-4.4.0) for ingesting files (from 1MB to 7MB) to hdfs.

We’ve configured an agent on a remote box that monitors a spooldir and when available sends files through avro to the flume agent on the cluster.

When receiving an event , a temp file is written on hdfs (for instance 'assic.1384440732750.tmp'), then renamed (.tmp deleted); this process loops indefinitely incrementing the numeric part of the file name (assic.1384440732751.tmp, assic.1384440732752.tmp...).

The flume log and the hdfs folder grow indefinitely so the agent has to be stopped; then we need to remove all the generated files into the hdfs folder.

 

Here is the flume.conf on the remote agent:

 

agent.sources = ftpUploadDir

agent.channels = uploadedFileChannel

agent.sinks = toClusterAvroSink

 

#Specify the channel that sources and sink should use

agent.sinks.toClusterAvroSink.channel = uploadedFileChannel

agent.sources.ftpUploadDir.channels = uploadedFileChannel

 

####### Sources ####################################

# For each one of the sources, the type is defined

agent.sources.ftpUploadDir.type = spooldir

agent.sources.ftpUploadDir.spoolDir = /mypath/flumespool

agent.sources.ftpUploadDir.fileSuffix = .COMPLETED

 

####### Interceptors ###############################

agent.sources.apache.interceptors = itime ihost

# http://flume.apache.org/FlumeUserGuide.html#timestamp-interceptor

agent.sources.ftpUploadDir.interceptors.itime.type = timestamp

agent.sources.ftpUploadDir.interceptors.itime.preserveExisting = true

# http://flume.apache.org/FlumeUserGuide.html#host-interceptor

agent.sources.ftpUploadDir.interceptors.ihost.type = host

agent.sources.ftpUploadDir.interceptors.ihost.useIP = false

agent.sources.ftpUploadDir.interceptors.ihost.hostHeader = ftpHost

 

 

####### Sinks #######################################

# Each sink's type must be defined

agent.sinks.toClusterAvroSink.type = avro

agent.sinks.toClusterAvroSink.hostname = hadoop-snn.cselt.it

agent.sinks.toClusterAvroSink.port = 11111

 

 

####### Channels ####################################

# Each channel's type is defined.

agent.channels.uploadedFileChannel.type = file

 

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

#agent.channels.memoryChannel.capacity = 100000

 

 

Here is the flume.conf on the cluster agent:

 

# Sources, channels, and sinks are defined per
# agent name, in this case 'hdfsAgent'.
hdfsAgent.sources  = avroSource
hdfsAgent.channels = avroHdfsChannel
hdfsAgent.sinks    = hdfsSink

####### Sources ####################################
hdfsAgent.sources.avroSource.type     = avro
hdfsAgent.sources.avroSource.bind     = 0.0.0.0
hdfsAgent.sources.avroSource.port     = 11111

####### Sinks #######################################
hdfsAgent.sinks.hdfsSink.type           = hdfs
hdfsAgent.sinks.hdfsSink.hdfs.filePrefix   = assic
hdfsAgent.sinks.hdfsSink.hdfs.inUseSuffix  = .tmp
hdfsAgent.sinks.hdfsSink.hdfs.path         = hdfs://myPath/flume/from
hdfsAgent.sinks.hdfsSink.hdfs.fileType     = DataStream
hdfsAgent.sinks.hdfsSink.hdfs.writeFormat  = Text
hdfsAgent.sinks.hdfsSink.hdfs.callTimeout = 2000

#hdfsAgent.sinks.hdfsSink.hdfs.batchSize    = 10

#hdfsAgent.sinks.hdfsSink.hdfs.rollSize     = 134217728

#hdfsAgent.sinks.hdfsSink.hdfs.rollSize     = 52428800

#hdfsAgent.sinks.hdfsSink.hdfs.rollCount    = 10

#hdfsAgent.sinks.hdfsSink.hdfs.rollInterval = 43200

####### Channels ####################################
hdfsAgent.channels.avroHdfsChannel.type   = memory

#hdfsAgent.channels.avroHdfsChannel.type   = file

#Specify the channel that sources and sink should use
hdfsAgent.sources.avroSource.channels = avroHdfsChannel
hdfsAgent.sinks.hdfsSink.channel      = avroHdfsChannel

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
hdfsAgent.channels.avroHdfsChannel.capacity = 100000

 


Here is a sample of the flume.log:

 

INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:219)  - Creating hdfs://myPath/flume/from/assic.1384440732750.tmp
INFO  [hdfs-hdfsSink-call-runner-0] (org.apache.flume.sink.hdfs.BucketWriter$7.call:487)  - Renaming hdfs://myPath/flume/from/assic.1384440732750.tmp to hdfs://myPath/flume/from/assic.1384440732750
INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:219)  - Creating hdfs://myPath/flume/from/assic.1384440732751.tmp
INFO  [hdfs-hdfsSink-call-runner-5] (org.apache.flume.sink.hdfs.BucketWriter$7.call:487)  - Renaming hdfs://myPath/flume/from/assic.1384440732751.tmp to hdfs://myPath/flume/from/assic.1384440732751
[...]


Any help is greatly appreciated.

Thanks for your time

Rob

2 REPLIES 2
Highlighted

Re: Flume 1.4.0 - Continuous loop writing to hdfs

Cloudera Employee

I am not entirely sure I understand the problem. The files you see on HDFS contains the data which is being sent, so as more and more data comes in, the HDFS sink will create files to write this data.

Re: Flume 1.4.0 - Continuous loop writing to hdfs

New Contributor

Sorry, I'd to stay focused on other activities for a while.

Regarding the problem, it shows even on a single event (i.e. I've tried with a file containing a single line of a log).

All the files created on HDFS have the same content.

KR

R.