Support Questions

Find answers, ask questions, and share your expertise

Trying to integrate ibm mq as a flume source

avatar
Explorer

Hi guys!

I am looking for a guide/ Howto use flume with a jms source of ibm mq.. so far i have used the official documentation but with no luck.. 

Anybody has any expereience with this? 

 

1 ACCEPTED SOLUTION

avatar
Explorer

Okay, I finally got this working by setting up a File System Initial Context for the IBM MQ queue.

 

tier1.sources.source2.type = jms
tier1.sources.source2.providerURL = file:///home/cloudera/JNDI-Directory
tier1.sources.source2.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
tier1.sources.source2.destinationType = QUEUE
tier1.sources.source2.destinationName = theQueueName

View solution in original post

11 REPLIES 11

avatar
Mentor
Flume provides and supports an inbuilt JMS source that can work with external systems. This is documented further at http://archive.cloudera.com/cdh5/cdh/5/flume-ng/FlumeUserGuide.html#jms-source.

avatar
Explorer

Hi TheGuy12,

 

Have you had any luck with this yet? I've seen the documentation for using flume with a JMS source, but I cannot get it working for IBM MQ. Has anyone ever successfully been able to do this?

avatar
Expert Contributor

you can develop your own flume API for receive datab from MQ.

avatar
Explorer

 Not sure what you mean by "develop your own flume api". I've built a custom flume source that can listen to IBM MQ just fine, but we'd really like to use what Flume's already provided, their in-built JMS source. I just can't get it to work, was wondering if anyone else has and could show me how they configured it.

avatar

What errors are you getting on the flume side, or how are you observing the failure?

avatar
Explorer

I'm observing the error in the Flume logs. 

 

I think the real issue I'm having is potentially what to use as an appropriate initial context factory for IBM MQ. Currently I'm using com.ibm.mq.jms.context.WMQInitialContextFactory, but according to the internet, using that class in general is not recommended. But I don't know what a suitable alternative is... I do not have experience with IBM MQ beyond making simple java clients to listen in on queues.

 

Here is my configuration for the source:

tier1.sources.source1.type = jms
tier1.sources.source1.providerURL = theHostName:1415/theChannelName
tier1.sources.source1.channels = channel1
tier1.sources.source1.initialContextFactory = com.ibm.mq.jms.context.WMQInitialContextFactory
tier1.sources.source1.destinationType = QUEUE
tier1.sources.source1.destinationName = theQueueName

 

 

And the error:

Failed to publish event: SimpleEvent{attributes={STACKTRACE=[org.apache.flume.FlumeException: Could not create initial context com.ibm.mq.jms.context.WMQInitialContextFactory provider theHostName:1415/theChannelName
	at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:224)
	at org.apache.flume.source.BasicSourceSemantics.configure(BasicSourceSemantics.java:65)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: javax.naming.ServiceUnavailableException: Unable to connect to the target queue manager theHostName:1415/theChannelName [Root exception is com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2035'.]
	at com.ibm.mq.jms.context.MQContext.<init>(MQContext.java:196)
	at com.ibm.mq.jms.context.WMQInitialContextFactory.getInitialContext(WMQInitialContextFactory.java:29)
	at javax.naming.spi.NamingManager.getInitialContext(NamingManager.java:684)
	at javax.naming.InitialContext.getDefaultInitCtx(InitialContext.java:307)
	at javax.naming.InitialContext.init(InitialContext.java:242)
	at javax.naming.InitialContext.<init>(InitialContext.java:216)
	at org.apache.flume.source.jms.InitialContextFactory.create(InitialContextFactory.java:29)
	at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:222)
	... 12 more
Caused by: com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2035'.
	at com.ibm.mq.MQDestination.open(MQDestination.java:331)
	at com.ibm.mq.MQQueue.<init>(MQQueue.java:250)
	at com.ibm.mq.MQQueueManager.accessQueue(MQQueueManager.java:2799)
	at com.ibm.mq.pcf.PCFAgent.open(PCFAgent.java:334)
	at com.ibm.mq.pcf.PCFAgent.open(PCFAgent.java:309)
	at com.ibm.mq.pcf.PCFAgent.connect(PCFAgent.java:230)
	at com.ibm.mq.pcf.PCFAgent.<init>(PCFAgent.java:163)
	at com.ibm.mq.pcf.PCFMessageAgent.<init>(PCFMessageAgent.java:140)
	at com.ibm.mq.jms.context.MQContext.<init>(MQContext.java:183)
	... 19 more

 

 

Looking up MQJE001, Reason 2035, it's a MQRC_NOT_AUTHORIZED error. However I have no issues listening in on this same queue in java clients (without providing a username/password), and in the custom flume source I've created, again without providing a username or password.

 

This leads me to believe I've got something configured incorrectly, but I cannot figure it out.

 

Does anyone know a) which initial context factory to use for IBM MQ? or b) what else I'm doing wrong?

 

avatar
Explorer

Okay, I finally got this working by setting up a File System Initial Context for the IBM MQ queue.

 

tier1.sources.source2.type = jms
tier1.sources.source2.providerURL = file:///home/cloudera/JNDI-Directory
tier1.sources.source2.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
tier1.sources.source2.destinationType = QUEUE
tier1.sources.source2.destinationName = theQueueName

avatar
New Contributor

Background: Newbie flumer, newbie IBM MQ

 

I too am trying to connect to IBM MQ, it throws a class not found exception for WMQInitialContextFactory, I have included all jars which I found with the IBM MQ Client I installed just to get the jars, not surprisingly the WMQInitialContextFactory class mentioned above was not in any of the jars. 

 

But looking at the responses above WMQInitialContextFactory Class is not the way to go. 

 

Can the solutionist to this thead please share the anonymized contents of your JNDI properties file you used with the RefFSContextFactory class ? and/or describe the solution in detail. 

 

Please let me know if I am totally wrong in my approach.

avatar
Explorer

Sure. Sorry for the late response. You need to create the JNDI bindings file yourself with the admin tool that is included in the IBM MQ client install. I installed it on Windows, so for me, that was located at C:\Program Files (x86)\IBM\WebSphere MQ\java\bin\JMSAdmin.bat . In that directory, there's also a JMSAdmin.config file. 

I configured mine with these three lines : 

INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
PROVIDER_URL=file:/C:/tmp
SECURITY_AUTHENTICATION=none

Running JMSAdmin.bat:

InitCtx> DEF CF(myConnectionFactory) QMGR(theQueueManager) HOSTNAME(theHostName) PORT(1415) CHANNEL(MY.CHANNEL.01) TRANSPORT(CLIENT)

Then I moved the file it generated in C:/tmp over to my flume agent. The provider URL you specify in your flume config should be where this file is located.

 

For completion's sake I'll include the flume config I used from above here:

tier1.sources.source2.type = jms
tier1.sources.source2.providerURL = file:///home/cloudera/JNDI-Directory
tier1.sources.source2.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
tier1.sources.source2.destinationType = QUEUE
tier1.sources.source2.destinationName = theQueueName
tier1.sources.source2.connectionFactory = myConnectionFactory

 Hope that helps.