Support Questions

Find answers, ask questions, and share your expertise

Flume ftp source

New Contributor

I am trying to connect flume with ftp source, i had referred this community https://github.com/keedio/flume-ftp-source.

I have downloaded the ftp source jar for flume in this site and followed those instructions which is mentioned. Tried to connect with FTP.

Here it is pointing to all the files which is present in the home directory, INFO source.Source: Actual dir: /home/xxx files: 31.

I want to point only one file which is present in desktop, for that i have given working.directory property. But it is fetching all the records which is present in home directory.

And inbetween it is throwing some errors while pushing data to hdfs.

18/05/23 15:56:57 ERROR source.Source: ChannelException org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: MemChannel} at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275) at org.keedio.flume.source.ftp.source.Source.processMessage(Source.java:404) at org.keedio.flume.source.ftp.source.Source.readStream(Source.java:376) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:248) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.process(Source.java:102) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Heap space limit of 55297reached. Please increase heap space allocated to the channel as the sinks may not be keeping up with the sources at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267) ... 11 more 18/05/23 15:56:57 INFO hdfs.BucketWriter: Creating xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp 18/05/23 15:56:57 INFO hdfs.BucketWriter: Closing xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp 18/05/23 15:56:57 INFO hdfs.BucketWriter: Renaming xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp to xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656

What property need to be added to point only particular directory?

My property file

# Naming the components on the current agent.
FtpAgent.sources = ftp1
FtpAgent.channels = MemChannel
FtpAgent.sinks = HDFS
# Describing/Configuring the source
FtpAgent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
FtpAgent.sources.ftp1.client.source = ftp
FtpAgent.sources.ftp1.name.server = 192.168.1.1
FtpAgent.sources.ftp1.user = xxxx
FtpAgent.sources.ftp1.password = xxxx
FtpAgent.sources.ftp1.port = 21
FtpAgent.sources.ftp1.flushlines = false
FtpAgent.sources.ftp1.chunk.size = 1024
FtpAgent.sources.fpt1.run.discover.delay=5000
FtpAgent.sources.fpt1.working.directory = /home/xxx/Desktop/ftp_sample

# Describing/Configuring the sink
FtpAgent.sinks.HDFS.type = hdfs
FtpAgent.sinks.HDFS.hdfs.path = hdfs://xxx/topics/Specified_flume_ftp_data
FtpAgent.sinks.HDFS.hdfs.fileType = DataStream
FtpAgent.sinks.HDFS.hdfs.writeFormat = Text
FtpAgent.sinks.HDFS.hdfs.batchSize = 1000
FtpAgent.sinks.HDFS.hdfs.rollSize = 100
FtpAgent.sinks.HDFS.hdfs.rollCount = 100000
FtpAgent.sinks.hdfs.serializer=Text
# Describing/Configuring the channel
FtpAgent.channels.MemChannel.type = memory
FtpAgent.channels.MemChannel.capacity = 100000
FtpAgent.channels.MemChannel.transactionCapacity = 1000
FtpAgent.channels.MemChannel.byteCapacity = 6912212
# Binding the source and sink to the channel
FtpAgent.sources.ftp1.channels = MemChannel
FtpAgent.sinks.HDFS.channel = MemChannel

I am running the agent by hitting this below command,

bin/flume-ng agent --conf ./conf/ -f conf/flume_ftp_source.conf Dflume.root.logger=DEBUG,console -n FtpAgent

4 REPLIES 4

Rising Star

@sangeetha sivakumar

Hi, May be you can try to match file with regex pattern to read specific files:

filter.pattern

For the memory issue, have you tried increasing the byte capacity of the channel? Currently I can see its:

FtpAgent.channels.MemChannel.byteCapacity =6912212

Thanks!

New Contributor

@dbains that was my mistake because the below two lines contains fpt instead of ftp.

  1. FtpAgent.sources.fpt1.run.discover.delay=5000
  2. FtpAgent.sources.fpt1.working.directory =/home/xxx/Desktop/ftp_sample

How much i can increase the channel capacity ?

Rising Star

@sangeetha sivakumar As far as I know the byte capacity should be 80% of the total amount of heap space available to the process. The following link might help you if you need to get more details on this: http://flume.apache.org/FlumeUserGuide.html#memory-channel

Thanks!

Rising Star

@sangeetha sivakumar Did it help?