Support Questions

Find answers, ask questions, and share your expertise

Strange issue with FLUME NG, Its not throwing any error. Same time, Its not transferring Content

avatar

Dear Folks, i am having very strange issue with Flume NG, I could not get solution at anywhere, so i m reaching here.

I have configured flume to bring my webserver log to HDFS.
The problem here is, I am not seeing any error in Flume agent Log and Flume Collector Log.
So i got strucked where to start diagnose the issue.


Idea here is, i have webserver and I need to send those server logs to HDFS.
i installed Flume NG agent and configured Sppol Source, File channel and Avro Sink.
I installed Flume NG collector using Cloudera Manager and It has Avro Source, File channel and HDFS Sink.


Flume NG version is 1.4.0+23-1.cdh (both web server and Collector having same version)


NOTE: Webserver and Collector (Flume NG collector) are in public network , Means, Those two servers can be reachable over internet using one-one NATing.
I did not see any issue in connectivity, I am able to see ESTABLISHED session when i do telnet from WEB server to COLLECTOR over port 9999.


Flume NG agent config at WEB SERVER:
==================================
# list sources, sinks and channels in the agent
T2.sources = spool-source
T2.sinks = avro-avsink
T2.channels = file-channel
# Source Property
T2.sources.spool-source.type = spooldir
T2.sources.spool-source.spoolDir = /logs-archive/test
T2.sources.spool-source.channels = file-channel
T2.sources.spool-source.fileHeader = true
T2.sources.spool-source.trackerDir = /logs-archive/flume/DATA-03
# Sink Property
T2.sinks.avro_avsink.type = avro
T2.sinks.avro_avsink.channel = file-channel
T2.sinks.avro_avsink.hostname = 208.XX.XX.XX # PUBLIC IP ADDRESS OF COLLECTOR
T2.sinks.avro_avsink.port = 9999

# Channel Property
T2.channels.file-channel.type = memory
T2.channels.file-channel.checkpointDir = /logs-archive/flume/DATA-01
T2.channels.file-channel.dataDirs = /logs-archive/flume/DATA-02


FLUME NG COLLECTOR config :
=========================
# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 10.2.4.10 # PRIVATE IP ADDRESS OF COLLECTOR, BUT IT HAS BEEN NAT WITH PUBLIC IP 208.XX.XX.XX
tier1.sources.source1.port = 9999
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 12
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/sdd1
tier1.channels.channel1.dataDir = /mnt/sdd1

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.filePrefix = %{host}
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://xyz.com/user/test
tier1.sinks.sink1.threadsPoolSize = 20


Any Advice or Suggestion are truly appreciated !!!

Best Regards,
Bommuraj

 

1 ACCEPTED SOLUTION

avatar

Dear Folks,We Fixed the issue with below configuaration.

Thank you all for your assistance on this.

 

FLUME agent Configuaration:

========================

# list sources, sinks and channels in the agent
bench.sources = spool-source
bench.sinks = avro-avsink
bench.channels = file-channel

# Source Property
bench.sources.spool-source.type = spooldir
bench.sources.spool-source.spoolDir = /logs-archive/test
bench.sources.spool-source.channels = file-channel
bench.sources.spool-source.batchSize = 2000
bench.sources.spool-source.fileHeader = true
bench.sources.spool-source.fileHeaderKey = file
bench.sources.spool-source.bufferMaxLineLength = 8000
bench.sources.spool-source.trackerDir = /logs-archive/flume/DATA-03

# Interceptor
bench.sources.spool-source.interceptors = hostint
bench.sources.spool-source.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
bench.sources.spool-source.interceptors.hostint.preserveExisting = true
bench.sources.spool-source.interceptors.hostint.useIP = false

bench.sources.spool-source.interceptors = eventint
bench.sources.spool-source.interceptors.eventint.type = AddEventHeader$Builder

# Sink Property
bench.sinks.avro-avsink.type = avro
bench.sinks.avro-avsink.channel = file-channel
bench.sinks.avro-avsink.hostname = flume-01.XYZ.com
bench.sinks.avro-avsink.port = 9999
bench.sinks.avro_avsink.batch-size = 2000

# Channel Property
bench.channels.file-channel.type = file
bench.channels.file-channel.checkpointDir = /logs-archive/flume/DATA-01
bench.channels.file-channel.dataDirs = /logs-archive/flume/DATA-02
bench.channels.file-channel.transactionCapacity = 4000
bench.channels.file-channel.maxFileSize = 2147483648
bench.channels.file-channel.minimumRequiredSpace = 400000000
bench.channels.file-channel.capacity = 1000000

 

Flume Server Configuaration :
=======================

# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

# Source Property
tier1.sources.source1.type = avro
tier1.sources.source1.bind = flume-01.XYZ.com
tier1.sources.source1.port = 9999
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 12

# Channel Property
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/sdd1/checkpointDir
tier1.channels.channel1.dataDirs = /mnt/sdd1/dataDir
tier1.channels.channel1.minimumRequiredSpace = 83558400
tier1.channels.channel1.transactionCapacity = 1000000
tier1.channels.channel1.maxFileSize = 2147483648
tier1.channels.channel1.capacity = 9000000

# Interceptor Property
#tier1.sources.source1.interceptors = hostint
#tier1.sources.source1.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
#tier1.sources.source1.interceptors.hostint.preserveExisting = true
#tier1.sources.source1.interceptors.hostint.useIP = false

# Sink Property
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://example.intranet.com:8020/flume-test/%{eventName}/%Y-%m-%d/%H
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.filePrefix = %{host}
tier1.sinks.sink1.hdfs.rollSize = 1048576
tier1.sinks.sink1.hdfs.rollInterval = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollTimerPoolSize = 10
tier1.sinks.sink1.hdfs.batchSize = 20000
tier1.sinks.sink1.hdfs.maxOpenFiles = 500000
tier1.sinks.sink1.hdfs.file.Type = SequenceFile
tier1.sinks.sink1.hdfs.writeFormat = Writable
tier1.sinks.sink1.hdfs.threadsPoolSize = 20

 

Best Regards,
Bommuraj

View solution in original post

5 REPLIES 5

avatar

Sorry, Its my mistake, I found what is the issue.


There is a typo in flume ng agent configuration on web server end,
It means, Sink name, I used avro_sink instead of avro-sink.


I got another question now,

Question is, (i have webserver and I need to send those server logs to HDFS.)
i installed Flume NG agent (web server) and configured Spool Source, File channel and Avro Sink.
I installed Flume NG collector using Cloudera Manager and It has Avro Source, File channel and HDFS Sink.


Here,
Avro breaking the source content (webserver logs) into avro event batches and sending into HDFS sink to store it.
HDFS storing those content like this format "FlumeData.1245567878"
But I reall dont want my content (webserver logs)needs to be breaken,
Means, All the files must reach to HDFS as like original (with out changing filename).

Kindly share your expertise and let me know what kind of Source/sink i must choose to accomplish that.

Any suggestion/advice are truly appreciated !!!

 

Best Regards,
Bommuraj Paramaraj

avatar
Cloudera Employee
Flume is designed to transfer event-formatted data, and does not move files as such. Flume will break down files into "events" (you can customize how Flume does this by writing your own deserializer), and write these out. If you need the original files in the exact same files, you can use the file information in the event headers, which can be interpreted using a serializer to annotate the file name information - and then rename it using an MR job. Flume does not directly support ingesting the files as is.

avatar

Thank You  shreedharan, i will try to add file name in the even header and will check that.
Again, Appreciated your help !!!

Regards,
Bommuraj Paramaraj

avatar

Team / shreedharan, I got some questions while I was trying to add "file name" into the  Event Header.

Please correct me if my understanding is Wrong !!!

 

I am  using "Spooling Directory Source" and I am adding "File Name" into the Header using the option "fileHeader = true" and "fileHeaderKey = key".

 

Here My Question is : 
Can i   bifurcate the flow using "File Name", ?
It means, Trying to direct the content into Different Channels  using "File Name" ? (using "Multiplex" option)

 

If i can bifurcate the flow using "File Name" (which there in header),  What would be the right answer for below options.

 

 

selector.type = multiplexing
selector.header = ????
selector.mapping.???? = File-channel-1

 

All Suggestions are highly appreciable.

(I am checking all possibilities on my Amazon EC2 instance to make this work if i found something, I will Post you all. )

 

Best Regards,
Bommuraj

 

avatar

Dear Folks,We Fixed the issue with below configuaration.

Thank you all for your assistance on this.

 

FLUME agent Configuaration:

========================

# list sources, sinks and channels in the agent
bench.sources = spool-source
bench.sinks = avro-avsink
bench.channels = file-channel

# Source Property
bench.sources.spool-source.type = spooldir
bench.sources.spool-source.spoolDir = /logs-archive/test
bench.sources.spool-source.channels = file-channel
bench.sources.spool-source.batchSize = 2000
bench.sources.spool-source.fileHeader = true
bench.sources.spool-source.fileHeaderKey = file
bench.sources.spool-source.bufferMaxLineLength = 8000
bench.sources.spool-source.trackerDir = /logs-archive/flume/DATA-03

# Interceptor
bench.sources.spool-source.interceptors = hostint
bench.sources.spool-source.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
bench.sources.spool-source.interceptors.hostint.preserveExisting = true
bench.sources.spool-source.interceptors.hostint.useIP = false

bench.sources.spool-source.interceptors = eventint
bench.sources.spool-source.interceptors.eventint.type = AddEventHeader$Builder

# Sink Property
bench.sinks.avro-avsink.type = avro
bench.sinks.avro-avsink.channel = file-channel
bench.sinks.avro-avsink.hostname = flume-01.XYZ.com
bench.sinks.avro-avsink.port = 9999
bench.sinks.avro_avsink.batch-size = 2000

# Channel Property
bench.channels.file-channel.type = file
bench.channels.file-channel.checkpointDir = /logs-archive/flume/DATA-01
bench.channels.file-channel.dataDirs = /logs-archive/flume/DATA-02
bench.channels.file-channel.transactionCapacity = 4000
bench.channels.file-channel.maxFileSize = 2147483648
bench.channels.file-channel.minimumRequiredSpace = 400000000
bench.channels.file-channel.capacity = 1000000

 

Flume Server Configuaration :
=======================

# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

# Source Property
tier1.sources.source1.type = avro
tier1.sources.source1.bind = flume-01.XYZ.com
tier1.sources.source1.port = 9999
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 12

# Channel Property
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/sdd1/checkpointDir
tier1.channels.channel1.dataDirs = /mnt/sdd1/dataDir
tier1.channels.channel1.minimumRequiredSpace = 83558400
tier1.channels.channel1.transactionCapacity = 1000000
tier1.channels.channel1.maxFileSize = 2147483648
tier1.channels.channel1.capacity = 9000000

# Interceptor Property
#tier1.sources.source1.interceptors = hostint
#tier1.sources.source1.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
#tier1.sources.source1.interceptors.hostint.preserveExisting = true
#tier1.sources.source1.interceptors.hostint.useIP = false

# Sink Property
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://example.intranet.com:8020/flume-test/%{eventName}/%Y-%m-%d/%H
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.filePrefix = %{host}
tier1.sinks.sink1.hdfs.rollSize = 1048576
tier1.sinks.sink1.hdfs.rollInterval = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollTimerPoolSize = 10
tier1.sinks.sink1.hdfs.batchSize = 20000
tier1.sinks.sink1.hdfs.maxOpenFiles = 500000
tier1.sinks.sink1.hdfs.file.Type = SequenceFile
tier1.sinks.sink1.hdfs.writeFormat = Writable
tier1.sinks.sink1.hdfs.threadsPoolSize = 20

 

Best Regards,
Bommuraj