Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to read data from oracle using FLUME to kafka broker

avatar
Contributor

Hi

I am trying to pull the data from oracle to kafka broker using flume. but its giving me error.

2016-10-26 13:52:27,447 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows. org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.SQLSource, class: org.apache.flume.source.SQLSource

my flume.conf is

# Flume agent config

agent.channels = ch1

agent.sinks = kafkaSink

agent.sources = sql-source

agent.channels.ch1.type = memory

agent.channels.ch1.capacity = 1000000

agent.sources.sql-source.channels = ch1

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource #org.apache.flume.source.SQLSource #org.keedio.flume.source.SQLSource

# URL to connect to database

agent.sources.sql-source.connection.url = jdbc:oracle:thin:@xx.xx.xx.xx:xxxx:xxxx

# Database connection properties

agent.sources.sql-source.user = user_name

agent.sources.sql-source.password = passwd

agent.sources.sql-source.table = tbl1

agent.sources.sql-source.columns.to.select = *

# Increment column properties

agent.sources.sql-source.incremental.column.name = c1

# Increment value is from you want to start taking data from tables (0 will import entire table)

agent.sources.sql-source.incremental.value =1 #0

# Query delay, each configured milisecond the query will be sent

agent.sources.sql-source.run.query.delay=10000

#Status file is used to save last readed row

agent.sources.sql-source.status.file.path = /var/lib/flume

agent.sources.sql-source.status.file.name = sql-source.status agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.brokerList=xx.xx.xx.xx:yyyy

agent.sinks.kafkaSink.topic=test

agent.sinks.kafkaSink.channel=ch1

agent.sinks.kafkaSink.batchSize=10

1 ACCEPTED SOLUTION

avatar
Explorer

Never used it with Oracle as a source, but from the error I think the reason is that it is not locating the sql-source jar in the classpath.

Copy the flume-ng-sql-source-xxx.jar to /usr/hdp/current/flume-server/lib so it is included in the flume classpath (I have checked it is not included by default with HDP 2.4.2).

You will also need the connector for Oracle.

View solution in original post

4 REPLIES 4

avatar
Explorer

Never used it with Oracle as a source, but from the error I think the reason is that it is not locating the sql-source jar in the classpath.

Copy the flume-ng-sql-source-xxx.jar to /usr/hdp/current/flume-server/lib so it is included in the flume classpath (I have checked it is not included by default with HDP 2.4.2).

You will also need the connector for Oracle.

avatar
Contributor

Hi

I have done this but still its giving me error .

2016-10-27 10:35:50,512 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel ch1 type memory 2016-10-27 10:35:50,517 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel ch1 2016-10-27 10:35:50,518 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source sql-source, type org.apache.flume.source.SQLSource 2016-10-27 10:35:50,521 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows. org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.SQLSource, class: org.apache.flume.source.SQLSource at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67) at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:40) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:327) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.SQLSource at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:65)

avatar
Contributor

Resolved finally.

Yes you are right.

my target is achived ODB->FLUME->KAFKA

Thanks

avatar

Hi Mangesh,

I want to achieve the same. Can you help me with the jar files and connector please?

Regards,

Mitesh