Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Publish JMS processor failing

avatar
Contributor

MQ provider: IBM Websphere 7.5

HDF version: 1.2

Trying to consume from A.INBOUND queue using ConsumeJMS processor and post a message to another Queue : A.OUTBOUND using PublishJMS processor. Both processors use the same JMS connection service.

ConsumeJMS works fine, but PublishJMS fails with the following error:

16:36:52 EDT ERROR 8bd6ca6e-6cc6-4553-9435-08481dffbe2b PublishJMS - JMSPublisher[destination:A.OUTBOUND; pub-sub:false;] Failed while sending message to JMS via JMSPublisher[destination:A.OUTBOUND; pub-sub:false;]

MQ server is running locally.

I can confirm the connection info is correct and the queues exist.

Any help is appreciated.

1 ACCEPTED SOLUTION

avatar
Contributor

Fixed the issue. There is no issue with Publish JMS processor. Probably needs more documentation.

This is my original flow that did not work:

Kafka Get-> Evaluate JSON -> Publish JMS

Exception stakctrace:

Caused by: com.ibm.msg.client.jms.DetailedMessageFormatException: JMSCC0049: The property name 'kafka.partition' is not a valid Java(tm) identifier.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_74]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_74]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_74]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_74]
    at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319) ~[na:na]
    at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:233) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:109) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsMessageImpl.checkPropName(JmsMessageImpl.java:2168) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsMessageImpl.setStringProperty(JmsMessageImpl.java:1694) ~[na:na]
    at com.ibm.jms.JMSMessage.setStringProperty(JMSMessage.java:1496) ~[na:na]
    at org.apache.nifi.jms.processors.JMSPublisher$1.createMessage(JMSPublisher.java:87) ~[nifi-jms-processors-0.6.0.1.2.0.0-91.jar:0.6.0.1.2.0.0-91]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:603) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:584) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]

I saw that all the flow attributes were converted to JMS user headers that included 'kafka.partition' and other kafka attributes. I had to remove all kafka headers (I don't need them) using UpdateAttribute processor and then posted the message to the JMS queue. Now my flow looks like this:

Kafka get-> Evaluate JSON -> UpdateAttribute -> Publish JMS

If you do need all the flow attributes to be JMS headers ensure that JMS user header name conforms to Java.isIdentifierPart() rules. Here is the info from Java 7 javadoc:

public static boolean isJavaIdentifierPart(int codePoint)
Determines if the character (Unicode code point) may be part of a Java
 identifier as other than the first character.
 
 A character may be part of a Java identifier if any of the following
 are true:
 
  it is a letter
   it is a currency symbol (such as '$')
   it is a connecting punctuation character (such as '_')
   it is a digit
   it is a numeric letter (such as a Roman numeral character)
   it is a combining mark
   it is a non-spacing mark
  isIdentifierIgnorable(codePoint) returns true for
 the character
 

Based on the above rule 'kafka.partition' attribute is invalid and should be renamed to either 'kafkaPartition' or 'kafka_partition' before publishing to a JMS queue.

Hope this helps. Please correct me if I'm wrong or add on more information.

View solution in original post

4 REPLIES 4

avatar

Do you use SSL connection? Do you have a full stack trace in log files of NiFi?

avatar
Contributor

Fixed the issue. There is no issue with Publish JMS processor. Probably needs more documentation.

This is my original flow that did not work:

Kafka Get-> Evaluate JSON -> Publish JMS

Exception stakctrace:

Caused by: com.ibm.msg.client.jms.DetailedMessageFormatException: JMSCC0049: The property name 'kafka.partition' is not a valid Java(tm) identifier.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_74]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_74]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_74]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_74]
    at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319) ~[na:na]
    at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:233) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:109) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsMessageImpl.checkPropName(JmsMessageImpl.java:2168) ~[na:na]
    at com.ibm.msg.client.jms.internal.JmsMessageImpl.setStringProperty(JmsMessageImpl.java:1694) ~[na:na]
    at com.ibm.jms.JMSMessage.setStringProperty(JMSMessage.java:1496) ~[na:na]
    at org.apache.nifi.jms.processors.JMSPublisher$1.createMessage(JMSPublisher.java:87) ~[nifi-jms-processors-0.6.0.1.2.0.0-91.jar:0.6.0.1.2.0.0-91]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:603) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:584) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494) ~[spring-jms-4.2.4.RELEASE.jar:4.2.4.RELEASE]

I saw that all the flow attributes were converted to JMS user headers that included 'kafka.partition' and other kafka attributes. I had to remove all kafka headers (I don't need them) using UpdateAttribute processor and then posted the message to the JMS queue. Now my flow looks like this:

Kafka get-> Evaluate JSON -> UpdateAttribute -> Publish JMS

If you do need all the flow attributes to be JMS headers ensure that JMS user header name conforms to Java.isIdentifierPart() rules. Here is the info from Java 7 javadoc:

public static boolean isJavaIdentifierPart(int codePoint)
Determines if the character (Unicode code point) may be part of a Java
 identifier as other than the first character.
 
 A character may be part of a Java identifier if any of the following
 are true:
 
  it is a letter
   it is a currency symbol (such as '$')
   it is a connecting punctuation character (such as '_')
   it is a digit
   it is a numeric letter (such as a Roman numeral character)
   it is a combining mark
   it is a non-spacing mark
  isIdentifierIgnorable(codePoint) returns true for
 the character
 

Based on the above rule 'kafka.partition' attribute is invalid and should be renamed to either 'kafkaPartition' or 'kafka_partition' before publishing to a JMS queue.

Hope this helps. Please correct me if I'm wrong or add on more information.

avatar
New Contributor

Hi,

Do you have a screen shot of your setting in nifi?

Thanks a mill.

avatar
Contributor

Here is my github page with screenshots to IBM MQ and Oracle connections:

https://github.com/anair-it/nifi-docker

MQ client libraries are in "extra_lib" directory but you can place it anywhere you want.