Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

Highlighted

NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

New Contributor

Hi everyone,

we are using NiFi 1.4 and the ConsumeMQTT processor to subscribe to topics publish by a message broker from Web Methods calls Universal Messaging.

For some channels we experience unexpected behavior.

When I start the ConsumeMQTT Processor I get the following error message:

2018-08-29 09:05:16,615 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.mqtt.ConsumeMQTT ConsumeMQTT[id=01651015-56c3-1f18-9c66-35570bec2f7a] Connection to tcp://10.170.232.13:1883 lost (or was never connected) and ontrigger connect failed. Yielding processor:  
org.eclipse.paho.client.mqttv3.MqttException: MqttException
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:327)
 at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:313)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
 at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:255)
 at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
 at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Attached a screenshot with the processor settings.

85773-nifi-consumemqtt.jpg

Error is only thrown one time and the people who administer the broker told us they are not able to see the connection.

We are not getting the messages.

If I have to guess I would say the problem is related to the QoS value, but I have no clues.

Any idea or comment would be appreciated.

Kind regards,

Paul

2 REPLIES 2

Re: NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

New Contributor

@Paul Hernandez Are you using nifi in cluster mode? If yes, you can try change consume MQTT to primary node instead of all nodes. Hope it helps!

Re: NiFi 1.4 ConsumeMQTT Processor throws an error - MqttException (128)

Cloudera Employee

It worked. Thanks!