Reply
far
New Contributor
Posts: 5
Registered: ‎03-11-2017

Flume ingestion error ( need solution)

i am having this error 

 

17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:flume-agent.properties
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Added sinks: agent-sink Agent: agent
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 WARN conf.FlumeConfiguration: Could not configure sink agent-sink due to: No channel configured for sink: agent-sink
org.apache.flume.conf.ConfigurationException: No channel configured for sink: agent-sink
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:51)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:681)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:213)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:189)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:89)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Creating channels
17/03/11 23:35:34 INFO channel.DefaultChannelFactory: Creating instance of channel agent-chan type memory
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Created channel agent-chan
17/03/11 23:35:34 INFO source.DefaultSourceFactory: Creating instance of source agent-src, type spooldir
17/03/11 23:35:34 ERROR node.AbstractConfigurationProvider: Source agent-src has been removed due to an error during configuration
java.lang.IllegalStateException: Configuration must specify a spooling directory
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:140)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 WARN node.AbstractConfigurationProvider: Channel agent-chan has no components connected and has been removed.
17/03/11 23:35:34 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

 

 

 i made the flume-agent.properties file like this 

 


agent.sources = agent-src
agent.channels = agent-chan
agent.sinks = agent-sink

 

agent.sources.agent-src.type = spooldir

agent.sources.agent-src.spooldir = /home/cloudera/desktop/ingestionin

agent.sources.agent-src.fileheader = true

agent.sources.agent-src.channels = agent-chan

 

agent.channels.agent-chan.type = memory

agent.channels.agent-chan.capacity = 10000000

agent.channels.agent-chan.transactioncapacity = 2000

 


agent.sinks.agent-sink.channels = agent-chan

agent.sinks.agent-sink.type = hdfs

agent.sinks.agent-sink.writeformat = text

agent.sinks.agent-sink.hdfs.filetype = datastream

agent.sinks.agent-sink.hdfs.path = new/ingestionout

agent.sinks.agent-sink.hdfs.rollcount = 0

agent.sinks.agent-sink.hdfs.rollinterval = 0

agent.sinks.agent-sink.hdfs.rollsize = 0

agent.sinks.agent-sink.hdfs.idletimeout = 0

agent.sinks.agent-sink.hdfs.batchsize = 10000

agent.sinks.agent-sink.hdfs.fileprefix = events

 

 

but its having errors please let me know whats the error why flume agent is not running. thanks

 

Cloudera Employee
Posts: 204
Registered: ‎01-09-2014

Re: Flume ingestion error ( need solution)

Based on this error:
17/03/11 23:35:34 WARN conf.FlumeConfiguration: Could not configure sink agent-sink due to: No channel configured for sink: agent-sink
org.apache.flume.conf.ConfigurationException: No channel configured for sink: agent-sink

Sinks can only have one channel that they are attached to, change the following line:

agent.sinks.agent-sink.channels = agent-chan

To:


agent.sinks.agent-sink.channel = agent-chan
Champion
Posts: 632
Registered: ‎05-16-2016

Re: Flume ingestion error ( need solution)

There are few things that needs to be take care when dealing with flume configuration. 

 

when u define source . 

agent.sources = sr1 

when u define sink

agent.sinks = sink1 sink2 ...

when u define channels 

agent.channels = ch1 ch1 

in your configuration there is a typo . 

agent.sinks.agent-sink.channels = agent-chan

change it to 

agent.sinks.agent-sink.channel = agent-chan 

 

You can configure an agent with zero or more sinks , but each sink can read events exactly from one channel .

also you have to configure one channel for sink , if not it will be removed. 

far
New Contributor
Posts: 5
Registered: ‎03-11-2017

Re: Flume ingestion error ( need solution)

thanks its working now.

Champion
Posts: 632
Registered: ‎05-16-2016

Re: Flume ingestion error ( need solution)

Sounds good . mate

New Contributor
Posts: 3
Registered: ‎10-22-2017

Re: Flume ingestion error ( need solution)

I miss the configuration in Kafka streaming to hdfs via Flume.Any pointer?

 

My zookeeper, Kafka runs fine and tested both consumer and producer and data flows thru but when I configured in Flume as below, it does not sink to hdfs.
Can anyone see where I miss the configurations?

 

agent1.sources = kafka-source

agent1.channels = memory-channel
agent1.sinks = hdfs-sink

agent1.sources.kafka-source.bootstrap.servers = localhost:9092
agent1.sources.kafka-source.batchSize = 1
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = Namaskar-vanakam-Hello
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100

agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10
agent1.channels.memory-channel.transactionCapacity = 10

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 1
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.agent-sink.hdfs.idletimeout = 0

agent1.sinks.agent-sink.hdfs.batchSize = 1

 

 

Highlighted
New Contributor
Posts: 3
Registered: ‎10-22-2017

Re: Flume ingestion error ( need solution)

My zookeeper, Kafka runs fine and tested both consumer and producer and data flows thru but when I configured in Flume as below, it does not sink to hdfs.
Can anyone see where I miss the configurations?

agent1.sources = kafka-source

agent1.channels = memory-channel
agent1.sinks = hdfs-sink

agent1.sources.kafka-source.bootstrap.servers = localhost:9092
agent1.sources.kafka-source.batchSize = 1
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = Namaskar-vanakam-Hello
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100

agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10
agent1.channels.memory-channel.transactionCapacity = 10

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 1
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.agent-sink.hdfs.idletimeout = 0

agent1.sinks.agent-sink.hdfs.batchSize = 1

Announcements
New solutions