Reply
Highlighted
New Contributor
Posts: 1
Registered: ‎08-01-2018

Flume creates small files in HDFS instead of larger ones when consuming from a Kafka channel

Hello all!

 

I've been setting up Flume to transfer data previously stored in Kafka into HDFS, using Kafka as channel and HDFS as sink. I've been having a problem with flume, as it creates a lot of small files instead of one big one rotating every 3 hours or when the file reaches 1GB, as it has been configured. The configuration, minus the confidential data, is as it follows 

 

Global:

telemetriacorp.sources  = none_source
telemetriacorp.channels = kafka_topic1
telemetriacorp.sinks = hdfs_topic1

Source:

test1.sources.none_source.type = exec
test1.sources.none_source.command = /usr/bin/vmstat 1 > /dev/null
test1.sources.none_source.channels = kafka_topic1

Channel:

test1.channels.kafka_topic1.type = org.apache.flume.channel.kafka.KafkaChannel
test1.channels.kafka_topic1.kafka.bootstrap.servers = srvifsidsp01.agbar.ga.local:6667,srvifsidsp02.agbar.ga.local:6667,srvifsidsp03.agbar.ga.local:6667,srvifsidsp04.agbar.ga.local:6667,srvifsidsp05.agbar.ga.local:6667
test1.channels.kafka_topic1.kafka.topic = agbxxxmcafee0
test1.channels.kafka_topic1.kafka.consumer.group.id = flume_consumer_agbxxxmcafee0_0
test1.channels.kafka_topic1.parseAsFlumeEvent = false
test1.channels.kafka_topic1.migrateZookeeperOffsets = false
test1.channels.kafka_topic1.pollTimeout = 500 
test1.channels.kafka_topic1.kafka.consumer.auto.offset.reset = latest

Sink:

test1.sinks.hdfs_topic1.channel = kafka_topic1
test1.sinks.hdfs_topic1.type = hdfs
test1.sinks.hdfs_topic1.hdfs.path = /path1/%Y%m%d
test1.sinks.hdfs_topic1.hdfs.filePrefix = name1-%Y%m%d-%H%M
test1.sinks.hdfs_topic1.hdfs.inUsePrefix = _
test1.sinks.hdfs_topic1.hdfs.inUseSuffix = .tmp
test1.sinks.hdfs_topic1.hdfs.rollInterval =  3600  
test1.sinks.hdfs_topic1.hdfs.rollSize = 1073741824
test1.sinks.hdfs_topic1.hdfs.rollCount = 0    
test1.sinks.hdfs_topic1.hdfs.idleTimeout = 0
test1.sinks.hdfs_topic1.hdfs.batchSize = 10000
test1.sinks.hdfs_topic1.hdfs.codeC = gzip
test1.sinks.hdfs_topic1.hdfs.fileType = CompressedStream
test1.sinks.hdfs_topic1.hdfs.maxOpenFiles = 5000
test1.sinks.hdfs_topic1.hdfs.callTimeout = 10000   
test1.sinks.hdfs_topic1.hdfs.threadsPoolSize = 10  
test1.sinks.hdfs_topic1.hdfs.rollTimerPoolSize = 1 
test1.sinks.hdfs_topic1.hdfs.round = true
test1.sinks.hdfs_topic1.hdfs.roundValue = 60 
test1.sinks.hdfs_topic1.hdfs.roundUnit = minute
test1.sinks.hdfs_topic1.hdfs.timeZone = Europe/Madrid
test1.sinks.hdfs_topic1.hdfs.useLocalTimeStamp = true
test1.sinks.hdfs_topic1.hdfs.closeTries = 0
test1.sinks.hdfs_topic1.hdfs.retryInterval = 180

 

I have tried modifying a lot of the parametres, adding new ones but nothing seems to work. Any ideas on how to avoid the creation of really small files will be appreciated! Thanks for the help!

 

 

Announcements