Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

Hi everyone,

we are using NiFi 1.4 and the ConsumeMQTT processor to subscribe to topics publish by a message broker from Web Methods calls Universal Messaging.

For some channels we experience unexpected behavior.

When I start the ConsumeMQTT Processor I get the following error message:

2018-08-29 09:05:16,615 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.mqtt.ConsumeMQTT ConsumeMQTT[id=01651015-56c3-1f18-9c66-35570bec2f7a] Connection to tcp://10.170.232.13:1883 lost (or was never connected) and ontrigger connect failed. Yielding processor:  
org.eclipse.paho.client.mqttv3.MqttException: MqttException
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:327)
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:313)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:255)
 at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Attached a screenshot with the processor settings.

85773-nifi-consumemqtt.jpg

Error is only thrown one time and the people who administer the broker told us they are not able to see the connection.

We are not getting the messages.

If I have to guess I would say the problem is related to the QoS value, but I have no clues.

Any idea or comment would be appreciated.

Kind regards,

Paul

2 REPLIES 2

Explorer

@Paul Hernandez Are you using nifi in cluster mode? If yes, you can try change consume MQTT to primary node instead of all nodes. Hope it helps!

Cloudera Employee

It worked. Thanks!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.