Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

multiple sources of flume agent

avatar
Contributor

Hi Team,

 

I need to put log info from system,hadoop logs in hdfs in same machine

Do we specify multiple sources of flume agent in same machine.

 

The sample conf file i created is :

 

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 mem-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sources.avro-collection-source1.type = memory
agent_foo.sources.avro-collection-source1.command = tail -F /home/hadoop/hadoop-2.7.1/logs/yarn-hadoop-resourcemanager-sri.test.com.log

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = mem-channel-2
agent_foo.sources.exec-tail-source2.type = memory
agent_foo.sources.exec-tail-source2.command = tail -F /var/log/messages

# Define a sink that outputs to hdfs dir.
agent_foo.sources.hdfs-sink.channel = mem-channel-1 mem-channel-2
agent_foo.sources.hdfs-sink.type = hdfs
agent_foo.sources.hdfs-sink.hdfs.path = /yarn

 

It is not working for me.Flume agent is struck with these:

 

15/09/14 11:08:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-Cluster1-sink1 avro-forward-sink2 Agent: agent_foo
15/09/14 11:08:07 WARN conf.FlumeConfiguration: Agent configuration for 'agent_foo' does not contain any valid channels. Marking it as invalid.
15/09/14 11:08:07 WARN conf.FlumeConfiguration: Agent configuration invalid for agent 'agent_foo'. It will be removed.
15/09/14 11:08:07 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
15/09/14 11:08:07 WARN node.AbstractConfigurationProvider: No configuration found for this host:agent
15/09/14 11:08:07 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

 

Please suggest a solution. ...Sri

agent_foo.sources.hdfs-sink.hdfs.fileType = TEXT

1 ACCEPTED SOLUTION

avatar
Contributor

Hi Harish,

 

We cleared all things and sink is working now.

 

Sridhar

View solution in original post

10 REPLIES 10

avatar
Mentor
Yes, you can have multiple sources tied to a config name, but what you are
really missing is as the log says, a channel configuration. Please
configure a channel per
http://archive.cloudera.com/cdh5/cdh/5/flume-ng/FlumeUserGuide.html#a-simple-example

> 15/09/14 11:08:07 WARN conf.FlumeConfiguration: Agent configuration for
'agent_foo' does not contain any valid channels. Marking it as invalid.

avatar
Contributor

Harish,

 

I saw the url you (am using same) suggested.Can you make sample .conf where we can use multiple sources and multiple channels and multple sinks in same host.

 

Thanks in advance

Sridhar

avatar
Contributor

Hi Harish,

 

Please see modified .conf:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/hadoop-2.7.1/logs/yarn-hadoop-resourcemanager-sri.test.com.log
a1.sources.r1.channels = c1

# Describe/configure the source r2
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/messages
a1.sources.r2.channels = c2

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

# Define a sink that outputs to hdfs dir.
a1.sink.k1.type = hdfs
a1.sink.k1.hdfs.path = /yarn
a1.sink.k1.fileType = TEXT

# Define a sink that outputs to hdfs dir.
a1.sink.k2.type = hdfs
a1.sink.k2.hdfs.path = /yarn/test
a1.sink.k2.fileType = TEXT

 

 

Please see log file:

15/09/14 12:08:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/09/14 12:08:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/flume-1.6.0/conf/yarn.conf
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k2.hdfs.path
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k2.hdfs.path = /yarn/test
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k2.fileType
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k2.fileType = TEXT
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Processing:k1
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k1.type
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k1.type = hdfs
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k1.fileType
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k1.fileType = TEXT
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k1.hdfs.path
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k1.hdfs.path = /yarn
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Invalid property specified: sink.k2.type
15/09/14 12:08:34 WARN conf.FlumeConfiguration: Configuration property ignored: a1.sink.k2.type = hdfs
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Processing:k2
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Added sinks: k1 k2 Agent: a1
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Processing:k1
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Processing:k2
15/09/14 12:08:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/09/14 12:08:34 INFO node.AbstractConfigurationProvider: Creating channels

 

Please suggest any modification is requird for .conf file

 

Sri

 

avatar
Contributor

Hi Harish,

 

We cleared all things and sink is working now.

 

Sridhar

avatar
New Contributor

Can you please explain what steps you followed to resove this issue. 

 

Thanks

Pulkit

avatar
New Contributor

Nevermind, i got the issue in my config file. 

avatar
New Contributor

Hi Pukit,

 

Can you please eloboarate what was the issue in your config file. I am facing the same issue and I have configured the flume.conf as given in this post. 

avatar
Explorer
Also some character case issue, you should follow flume official document to do configure.

http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

avatar
Explorer

17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:flume-agent.properties
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Added sinks: agent-sink Agent: agent
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 WARN conf.FlumeConfiguration: Could not configure sink agent-sink due to: No channel configured for sink: agent-sink
org.apache.flume.conf.ConfigurationException: No channel configured for sink: agent-sink
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:51)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:681)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:213)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:189)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:89)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Creating channels
17/03/11 23:35:34 INFO channel.DefaultChannelFactory: Creating instance of channel agent-chan type memory
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Created channel agent-chan
17/03/11 23:35:34 INFO source.DefaultSourceFactory: Creating instance of source agent-src, type spooldir
17/03/11 23:35:34 ERROR node.AbstractConfigurationProvider: Source agent-src has been removed due to an error during configuration
java.lang.IllegalStateException: Configuration must specify a spooling directory
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:140)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 WARN node.AbstractConfigurationProvider: Channel agent-chan has no components connected and has been removed.
17/03/11 23:35:34 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

 

 

i made the flume-agent.properties file as 

 


agent.sources = agent-src
agent.channels = agent-chan
agent.sinks = agent-sink

 

agent.sources.agent-src.type = spooldir

agent.sources.agent-src.spooldir = /home/cloudera/desktop/ingestionin

agent.sources.agent-src.fileheader = true

agent.sources.agent-src.channels = agent-chan

 

agent.channels.agent-chan.type = memory

agent.channels.agent-chan.capacity = 10000000

agent.channels.agent-chan.transactioncapacity = 2000

 


agent.sinks.agent-sink.channels = agent-chan

agent.sinks.agent-sink.type = hdfs

agent.sinks.agent-sink.writeformat = text

agent.sinks.agent-sink.hdfs.filetype = datastream

agent.sinks.agent-sink.hdfs.path = new/ingestionout

agent.sinks.agent-sink.hdfs.rollcount = 0

agent.sinks.agent-sink.hdfs.rollinterval = 0

agent.sinks.agent-sink.hdfs.rollsize = 0

agent.sinks.agent-sink.hdfs.idletimeout = 0

agent.sinks.agent-sink.hdfs.batchsize = 10000

agent.sinks.agent-sink.hdfs.fileprefix = events