Created 05-23-2018 11:17 AM
I am trying to connect flume with ftp source, i had referred this community https://github.com/keedio/flume-ftp-source.
I have downloaded the ftp source jar for flume in this site and followed those instructions which is mentioned. Tried to connect with FTP.
Here it is pointing to all the files which is present in the home directory, INFO source.Source: Actual dir: /home/xxx files: 31.
I want to point only one file which is present in desktop, for that i have given working.directory property. But it is fetching all the records which is present in home directory.
And inbetween it is throwing some errors while pushing data to hdfs.
18/05/23 15:56:57 ERROR source.Source: ChannelException org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: MemChannel} at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275) at org.keedio.flume.source.ftp.source.Source.processMessage(Source.java:404) at org.keedio.flume.source.ftp.source.Source.readStream(Source.java:376) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:248) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:196) at org.keedio.flume.source.ftp.source.Source.process(Source.java:102) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Heap space limit of 55297reached. Please increase heap space allocated to the channel as the sinks may not be keeping up with the sources at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267) ... 11 more 18/05/23 15:56:57 INFO hdfs.BucketWriter: Creating xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp 18/05/23 15:56:57 INFO hdfs.BucketWriter: Closing xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp 18/05/23 15:56:57 INFO hdfs.BucketWriter: Renaming xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656.tmp to xxx/topics/Specified_flume_ftp_data/FlumeData.1527071214656
What property need to be added to point only particular directory?
My property file
# Naming the components on the current agent. FtpAgent.sources = ftp1 FtpAgent.channels = MemChannel FtpAgent.sinks = HDFS # Describing/Configuring the source FtpAgent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source FtpAgent.sources.ftp1.client.source = ftp FtpAgent.sources.ftp1.name.server = 192.168.1.1 FtpAgent.sources.ftp1.user = xxxx FtpAgent.sources.ftp1.password = xxxx FtpAgent.sources.ftp1.port = 21 FtpAgent.sources.ftp1.flushlines = false FtpAgent.sources.ftp1.chunk.size = 1024 FtpAgent.sources.fpt1.run.discover.delay=5000 FtpAgent.sources.fpt1.working.directory = /home/xxx/Desktop/ftp_sample # Describing/Configuring the sink FtpAgent.sinks.HDFS.type = hdfs FtpAgent.sinks.HDFS.hdfs.path = hdfs://xxx/topics/Specified_flume_ftp_data FtpAgent.sinks.HDFS.hdfs.fileType = DataStream FtpAgent.sinks.HDFS.hdfs.writeFormat = Text FtpAgent.sinks.HDFS.hdfs.batchSize = 1000 FtpAgent.sinks.HDFS.hdfs.rollSize = 100 FtpAgent.sinks.HDFS.hdfs.rollCount = 100000 FtpAgent.sinks.hdfs.serializer=Text # Describing/Configuring the channel FtpAgent.channels.MemChannel.type = memory FtpAgent.channels.MemChannel.capacity = 100000 FtpAgent.channels.MemChannel.transactionCapacity = 1000 FtpAgent.channels.MemChannel.byteCapacity = 6912212 # Binding the source and sink to the channel FtpAgent.sources.ftp1.channels = MemChannel FtpAgent.sinks.HDFS.channel = MemChannel I am running the agent by hitting this below command, bin/flume-ng agent --conf ./conf/ -f conf/flume_ftp_source.conf Dflume.root.logger=DEBUG,console -n FtpAgent
Created 06-07-2018 11:30 PM
Hi, May be you can try to match file with regex pattern to read specific files:
filter.pattern
For the memory issue, have you tried increasing the byte capacity of the channel? Currently I can see its:
FtpAgent.channels.MemChannel.byteCapacity =6912212
Thanks!
Created 06-08-2018 04:56 AM
@dbains that was my mistake because the below two lines contains fpt instead of ftp.
How much i can increase the channel capacity ?
Created 06-08-2018 07:55 AM
@sangeetha sivakumar As far as I know the byte capacity should be 80% of the total amount of heap space available to the process. The following link might help you if you need to get more details on this: http://flume.apache.org/FlumeUserGuide.html#memory-channel
Thanks!
Created 06-12-2018 08:27 AM
@sangeetha sivakumar Did it help?