Support Questions
Find answers, ask questions, and share your expertise

Flume Twitter: Channel MemChannel has no components connected and has been removed.

New Contributor

I am trying to fetch twitter feed. But it is not running. It is stuck at the following response:

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-01-30 23:46:26,038 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2022-01-30 23:46:26,053 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/aviparna/apache-flume-1.9.0-bin/conf/flume.conf
2022-01-30 23:46:26,059 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,072 INFO conf.FlumeConfiguration: Processing:MemChannel
2022-01-30 23:46:26,072 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,072 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:MemChannel
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:MemChannel
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,073 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,074 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,075 INFO conf.FlumeConfiguration: Processing:HDFS
2022-01-30 23:46:26,075 INFO conf.FlumeConfiguration: Processing:Twitter
2022-01-30 23:46:26,075 WARN conf.FlumeConfiguration: Agent configuration for 'TwitterAgent' has no configfilters.
2022-01-30 23:46:26,129 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
2022-01-30 23:46:26,132 INFO node.AbstractConfigurationProvider: Creating channels
2022-01-30 23:46:26,156 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
2022-01-30 23:46:26,161 INFO node.AbstractConfigurationProvider: Created channel MemChannel
2022-01-30 23:46:26,161 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type com.cloudera.flume.source.TwitterSource
2022-01-30 23:46:26,274 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
2022-01-30 23:46:26,300 ERROR node.AbstractConfigurationProvider: Sink HDFS has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: HDFS, batch size = 200000, channel MemChannel, transaction capacity = 1000
at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2022-01-30 23:46:26,318 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter]
2022-01-30 23:46:26,323 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
2022-01-30 23:46:26,339 INFO node.Application: Starting Channel MemChannel
2022-01-30 23:46:26,547 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
2022-01-30 23:46:26,547 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
2022-01-30 23:46:26,547 INFO node.Application: Starting Source Twitter
2022-01-30 23:46:26,550 INFO twitter4j.TwitterStreamImpl: Establishing connection.
2022-01-30 23:46:27,910 INFO twitter4j.TwitterStreamImpl: Connection established.
2022-01-30 23:46:27,911 INFO twitter4j.TwitterStreamImpl: Receiving status stream.

 

Attaching my flume.conf file for reference:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.keywords = bitcoin
TwitterAgent.sources.Twitter.maxBatchSize = 50000
TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000

# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://localhost:9000/user/*****/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeformat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=200000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 2000000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000


#bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -n agent1 -Dflume.root.logger=DEBUG,console

0 REPLIES 0
; ;