Created 04-22-2016 08:38 PM
MQ provider: IBM Websphere 7.5
HDF version: 1.2
Trying to consume from A.INBOUND queue using ConsumeJMS processor and post a message to another Queue : A.OUTBOUND using PublishJMS processor. Both processors use the same JMS connection service.
ConsumeJMS works fine, but PublishJMS fails with the following error:
16:36:52 EDT ERROR 8bd6ca6e-6cc6-4553-9435-08481dffbe2b PublishJMS - JMSPublisher[destination:A.OUTBOUND; pub-sub:false;] Failed while sending message to JMS via JMSPublisher[destination:A.OUTBOUND; pub-sub:false;]
MQ server is running locally.
I can confirm the connection info is correct and the queues exist.
Any help is appreciated.
Created 04-25-2016 02:53 PM
Fixed the issue. There is no issue with Publish JMS processor. Probably needs more documentation.
This is my original flow that did not work:
Kafka Get-> Evaluate JSON -> Publish JMS
Exception stakctrace:
Caused by: com.ibm.msg.client.jms.DetailedMessageFormatException: JMSCC0049: The property name 'kafka.partition' is not a valid Java(tm) identifier. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_74] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_74] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_74] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_74] at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319) ~[na:na] at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:233) ~[na:na] at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:109) ~[na:na] at com.ibm.msg.client.jms.internal.JmsMessageImpl.checkPropName(JmsMessageImpl.java:2168) ~[na:na] at com.ibm.msg.client.jms.internal.JmsMessageImpl.setStringProperty(JmsMessageImpl.java:1694) ~[na:na] at com.ibm.jms.JMSMessage.setStringProperty(JMSMessage.java:1496) ~[na:na] at org.apache.nifi.jms.processors.JMSPublisher$1.createMessage(JMSPublisher.java:87) ~[nifi-jms-processors-0.6.0.1.2.0.0-91.jar:0.6.0.1.2.0.0-91] at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:603) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:584) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
I saw that all the flow attributes were converted to JMS user headers that included 'kafka.partition' and other kafka attributes. I had to remove all kafka headers (I don't need them) using UpdateAttribute processor and then posted the message to the JMS queue. Now my flow looks like this:
Kafka get-> Evaluate JSON -> UpdateAttribute -> Publish JMS
If you do need all the flow attributes to be JMS headers ensure that JMS user header name conforms to Java.isIdentifierPart() rules. Here is the info from Java 7 javadoc:
public static boolean isJavaIdentifierPart(int codePoint) Determines if the character (Unicode code point) may be part of a Java identifier as other than the first character. A character may be part of a Java identifier if any of the following are true: it is a letter it is a currency symbol (such as '$') it is a connecting punctuation character (such as '_') it is a digit it is a numeric letter (such as a Roman numeral character) it is a combining mark it is a non-spacing mark isIdentifierIgnorable(codePoint) returns true for the character
Based on the above rule 'kafka.partition' attribute is invalid and should be renamed to either 'kafkaPartition' or 'kafka_partition' before publishing to a JMS queue.
Hope this helps. Please correct me if I'm wrong or add on more information.
Created 04-23-2016 10:43 AM
Do you use SSL connection? Do you have a full stack trace in log files of NiFi?
Created 04-25-2016 02:53 PM
Fixed the issue. There is no issue with Publish JMS processor. Probably needs more documentation.
This is my original flow that did not work:
Kafka Get-> Evaluate JSON -> Publish JMS
Exception stakctrace:
Caused by: com.ibm.msg.client.jms.DetailedMessageFormatException: JMSCC0049: The property name 'kafka.partition' is not a valid Java(tm) identifier. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_74] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_74] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_74] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_74] at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319) ~[na:na] at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:233) ~[na:na] at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:109) ~[na:na] at com.ibm.msg.client.jms.internal.JmsMessageImpl.checkPropName(JmsMessageImpl.java:2168) ~[na:na] at com.ibm.msg.client.jms.internal.JmsMessageImpl.setStringProperty(JmsMessageImpl.java:1694) ~[na:na] at com.ibm.jms.JMSMessage.setStringProperty(JMSMessage.java:1496) ~[na:na] at org.apache.nifi.jms.processors.JMSPublisher$1.createMessage(JMSPublisher.java:87) ~[nifi-jms-processors-0.6.0.1.2.0.0-91.jar:0.6.0.1.2.0.0-91] at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:603) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:584) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
I saw that all the flow attributes were converted to JMS user headers that included 'kafka.partition' and other kafka attributes. I had to remove all kafka headers (I don't need them) using UpdateAttribute processor and then posted the message to the JMS queue. Now my flow looks like this:
Kafka get-> Evaluate JSON -> UpdateAttribute -> Publish JMS
If you do need all the flow attributes to be JMS headers ensure that JMS user header name conforms to Java.isIdentifierPart() rules. Here is the info from Java 7 javadoc:
public static boolean isJavaIdentifierPart(int codePoint) Determines if the character (Unicode code point) may be part of a Java identifier as other than the first character. A character may be part of a Java identifier if any of the following are true: it is a letter it is a currency symbol (such as '$') it is a connecting punctuation character (such as '_') it is a digit it is a numeric letter (such as a Roman numeral character) it is a combining mark it is a non-spacing mark isIdentifierIgnorable(codePoint) returns true for the character
Based on the above rule 'kafka.partition' attribute is invalid and should be renamed to either 'kafkaPartition' or 'kafka_partition' before publishing to a JMS queue.
Hope this helps. Please correct me if I'm wrong or add on more information.
Created 05-24-2016 08:27 AM
Hi,
Do you have a screen shot of your setting in nifi?
Thanks a mill.
Created 05-24-2016 07:18 PM
Here is my github page with screenshots to IBM MQ and Oracle connections:
https://github.com/anair-it/nifi-docker
MQ client libraries are in "extra_lib" directory but you can place it anywhere you want.