Created on 04-07-2015 06:00 AM - edited 09-16-2022 02:26 AM
I have created a custom source for flume and copied the jar files in the following locations :
mkdir -p /usr/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar
mkdir -p /var/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar
chown -R flume:flume /usr/lib/flume-ng/
chown -R flume:flume /var/lib/flume-ng/
Also in /etc/flume-ng/conf/flume-env.sh
FLUME_CLASSPATH="/usr/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar"
Updated the Flume configuration file as
# Name the components on this agent
tail1.sources = seq-source
tail1.channels = mem-channel
tail1.sinks = hdfs-sink
# Describe/configure Source
tail1.sources.seq-source.type = org.custom.flume.source.MySource
# Describe the sink
tail1.sinks.hdfs-sink.type = hdfs
tail1.sinks.hdfs-sink.hdfs.path = /user/flume
tail1.sinks.hdfs-sink.hdfs.filePrefix = log
tail1.sinks.hdfs-sink.hdfs.rollInterval = 0
tail1.sinks.hdfs-sink.hdfs.rollCount = 10000
tail1.sinks.hdfs-sink.hdfs.fileType = DataStream
# Use a channel which buffers events in file
tail1.channels.mem-channel.type = memory
tail1.channels.mem-channel.capacity = 1000
tail1.channels.mem-channel.transactionCapacity = 100
# Bind the source and sink to the channel
tail1.sources.seq-source.channels = mem-channel
tail1.sinks.hdfs-sink.channel = mem-channel
Trying to run the flume agent as
flume-ng agent --conf /var/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar --conf-file /etc/flume-ng/conf/flume-conf.properties --name tail1
flume-ng agent --conf-file /etc/flume-ng/conf/flume-conf.properties --name tail1
In both cases I am getting the following error :
ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to create source: seq-source, type: org.custom.flume.source.MySource, class: org.custom.flume.source.MySource
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:48)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:44)
... 10 more
If any one aware of it please help me.
Created 04-23-2015 09:28 AM
Created 04-23-2015 09:28 AM