Created on 01-25-2014 02:06 PM - edited 09-16-2022 01:52 AM
Hi guys!
I am looking for a guide/ Howto use flume with a jms source of ibm mq.. so far i have used the official documentation but with no luck..
Anybody has any expereience with this?
Created 11-13-2014 01:23 PM
Okay, I finally got this working by setting up a File System Initial Context for the IBM MQ queue.
tier1.sources.source2.type = jms
tier1.sources.source2.providerURL = file:///home/cloudera/JNDI-Directory
tier1.sources.source2.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
tier1.sources.source2.destinationType = QUEUE
tier1.sources.source2.destinationName = theQueueName
Created 01-16-2015 09:17 AM
We didn't have the rights to create the JNDI file and the required tools.
Initially we wrote a thin JMS client for IBM MQ + SSL using IBM libs and
convert it to a flume pollable source for IBM MQ + SSL,
Created 10-14-2015 08:05 PM
I am able to connect to IBM MQ using the steps mentioned here. But when Flume is trying to consume any messages from the Q, its throwing following exception.
com.ibm.msg.client.jms.DetailedMessageFormatException: JMSCC0053: An exception occurred deserializing a message, exception: 'java.lang.ClassNotFoundException: null class'.
It was not possible to deserialize the message because of the exception shown.
1) I am using all the ibm mq client jars. Flume is starting with out any exception. But exception is coming when trying to consume
the messages .
2) I am putting a custom message [Serializable object] into Q which Flume need to consume.
3) Flume 1.5.0-cdh5.4.1
4) MQ Version 8.x
a1.sources=fe_s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=jms
a1.sources.s1.channels=c1
a1.sources.s1.initialContextFactory=com.sun.jndi.fscontext.RefFSContextFactory
a1.sources.s1.connectionFactory=FLUME_CF
a1.sources.s1.destinationName=MY.Q
a1.sources.s1.providerURL=file:///home/JNDI-Directory
a1.sources.s1.destinationType=QUEUE
a1.sources.s1.transportType=1
a1.sources.s1.userName=mqm
a1.sources.s1.batchSize=1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1