Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

avatar
Rising Star

Hi everyone,

we are using NiFi 1.4 and the ConsumeMQTT processor to subscribe to topics publish by a message broker from Web Methods calls Universal Messaging.

For some channels we experience unexpected behavior.

When I start the ConsumeMQTT Processor I get the following error message:

2018-08-29 09:05:16,615 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.mqtt.ConsumeMQTT ConsumeMQTT[id=01651015-56c3-1f18-9c66-35570bec2f7a] Connection to tcp://10.170.232.13:1883 lost (or was never connected) and ontrigger connect failed. Yielding processor:  
org.eclipse.paho.client.mqttv3.MqttException: MqttException
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:327)
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:313)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:255)
 at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Attached a screenshot with the processor settings.

85773-nifi-consumemqtt.jpg

Error is only thrown one time and the people who administer the broker told us they are not able to see the connection.

We are not getting the messages.

If I have to guess I would say the problem is related to the QoS value, but I have no clues.

Any idea or comment would be appreciated.

Kind regards,

Paul

2 REPLIES 2

avatar
Contributor

@Paul Hernandez Are you using nifi in cluster mode? If yes, you can try change consume MQTT to primary node instead of all nodes. Hope it helps!

avatar
Expert Contributor

It worked. Thanks!