Support Questions

Find answers, ask questions, and share your expertise

Flume ingestion error ( need solution)

avatar
Explorer

i am having this error 

 

17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
17/03/11 23:35:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:flume-agent.properties
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Added sinks: agent-sink Agent: agent
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Processing:agent-sink
17/03/11 23:35:34 WARN conf.FlumeConfiguration: Could not configure sink agent-sink due to: No channel configured for sink: agent-sink
org.apache.flume.conf.ConfigurationException: No channel configured for sink: agent-sink
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:51)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:681)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:213)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:189)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:89)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Creating channels
17/03/11 23:35:34 INFO channel.DefaultChannelFactory: Creating instance of channel agent-chan type memory
17/03/11 23:35:34 INFO node.AbstractConfigurationProvider: Created channel agent-chan
17/03/11 23:35:34 INFO source.DefaultSourceFactory: Creating instance of source agent-src, type spooldir
17/03/11 23:35:34 ERROR node.AbstractConfigurationProvider: Source agent-src has been removed due to an error during configuration
java.lang.IllegalStateException: Configuration must specify a spooling directory
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:140)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/11 23:35:34 WARN node.AbstractConfigurationProvider: Channel agent-chan has no components connected and has been removed.
17/03/11 23:35:34 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

 

 

 i made the flume-agent.properties file like this 

 


agent.sources = agent-src
agent.channels = agent-chan
agent.sinks = agent-sink

 

agent.sources.agent-src.type = spooldir

agent.sources.agent-src.spooldir = /home/cloudera/desktop/ingestionin

agent.sources.agent-src.fileheader = true

agent.sources.agent-src.channels = agent-chan

 

agent.channels.agent-chan.type = memory

agent.channels.agent-chan.capacity = 10000000

agent.channels.agent-chan.transactioncapacity = 2000

 


agent.sinks.agent-sink.channels = agent-chan

agent.sinks.agent-sink.type = hdfs

agent.sinks.agent-sink.writeformat = text

agent.sinks.agent-sink.hdfs.filetype = datastream

agent.sinks.agent-sink.hdfs.path = new/ingestionout

agent.sinks.agent-sink.hdfs.rollcount = 0

agent.sinks.agent-sink.hdfs.rollinterval = 0

agent.sinks.agent-sink.hdfs.rollsize = 0

agent.sinks.agent-sink.hdfs.idletimeout = 0

agent.sinks.agent-sink.hdfs.batchsize = 10000

agent.sinks.agent-sink.hdfs.fileprefix = events

 

 

but its having errors please let me know whats the error why flume agent is not running. thanks

 

14 REPLIES 14

avatar
Based on this error:
17/03/11 23:35:34 WARN conf.FlumeConfiguration: Could not configure sink agent-sink due to: No channel configured for sink: agent-sink
org.apache.flume.conf.ConfigurationException: No channel configured for sink: agent-sink

Sinks can only have one channel that they are attached to, change the following line:

agent.sinks.agent-sink.channels = agent-chan

To:


agent.sinks.agent-sink.channel = agent-chan

avatar
Explorer

thanks its working now.

avatar
Champion

Sounds good . mate

avatar
Explorer

I miss the configuration in Kafka streaming to hdfs via Flume.Any pointer?

 

My zookeeper, Kafka runs fine and tested both consumer and producer and data flows thru but when I configured in Flume as below, it does not sink to hdfs.
Can anyone see where I miss the configurations?

 

agent1.sources = kafka-source

agent1.channels = memory-channel
agent1.sinks = hdfs-sink

agent1.sources.kafka-source.bootstrap.servers = localhost:9092
agent1.sources.kafka-source.batchSize = 1
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = Namaskar-vanakam-Hello
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100

agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10
agent1.channels.memory-channel.transactionCapacity = 10

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 1
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.agent-sink.hdfs.idletimeout = 0

agent1.sinks.agent-sink.hdfs.batchSize = 1

 

 

avatar
Champion

There are few things that needs to be take care when dealing with flume configuration. 

 

when u define source . 

agent.sources = sr1 

when u define sink

agent.sinks = sink1 sink2 ...

when u define channels 

agent.channels = ch1 ch1 

in your configuration there is a typo . 

agent.sinks.agent-sink.channels = agent-chan

change it to 

agent.sinks.agent-sink.channel = agent-chan 

 

You can configure an agent with zero or more sinks , but each sink can read events exactly from one channel .

also you have to configure one channel for sink , if not it will be removed. 

avatar
Explorer

My zookeeper, Kafka runs fine and tested both consumer and producer and data flows thru but when I configured in Flume as below, it does not sink to hdfs.
Can anyone see where I miss the configurations?

agent1.sources = kafka-source

agent1.channels = memory-channel
agent1.sinks = hdfs-sink

agent1.sources.kafka-source.bootstrap.servers = localhost:9092
agent1.sources.kafka-source.batchSize = 1
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = Namaskar-vanakam-Hello
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100

agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10
agent1.channels.memory-channel.transactionCapacity = 10

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 1
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.agent-sink.hdfs.idletimeout = 0

agent1.sinks.agent-sink.hdfs.batchSize = 1

avatar
New Contributor

Hi Rajak,

 

What error does it give? I am alsi facing similar issue. Below is the error.

 

"org.apache.flume.conf.ConfigurationException: Bootstrap Servers must be specified"

 

Although, I have specified the "bootstrap server" info in flume conf file as: 

 

agent1.sources.kafka-source.bootstrap.servers = localhost:9092

 

Regards,

Jahar Tyagi

avatar
Explorer

Its working ever since then. It's just the values in configurations of Kafka, setting. I remember , few years back , when I consumed from a contunuous file system as input and streamed to hdfs sink, the size configiguration has to be proper, else it will give some strange output.

avatar
Double Check what version of CDH you are running. Since 5.8, CDH flume uses the new flume configuration for kafka sources, meaning you have to specify the bootstrap servers as:

agent1.sources.kafka-source.kafka.bootstrap.servers = localhost:9092

http://flume.apache.org/FlumeUserGuide.html#kafka-source

-pd