Created on 12-26-2017 10:42 AM - edited 08-18-2019 02:51 AM
There is the option of setting Client ID as empty string in the processor as shown below.
But after applying these settings I don't get the option of starting the processor it gives me a validation error as below
I need my Client ID to be empty because of some connection issues.
Created 12-26-2017 07:28 PM
You might want to see it equal to %
I am not sure how that could cause an error to have a client id?
I have a few dozen MQTT clients and servers running with no issues, you are grabbing all the topics. I have use cloud, local and server brokers. Can you use Mosquitto? That one is really solid.
The client ID should not affect connection issues.
I have run the MQTT broker on the same machine or different machines. I have run them on Raspberry Pi, OSX, Centos and Ubuntu. All times have worked fine. Client ID should not be important.
What errors do you get?
You could fork the ConsumeMQTT code and make it a custom processor without a client name. I don't think that's a good idea. I am not sure if the underlying Java library would support that.
Can you consume MQTT messages in Python or Java (or something else) on the same machine with no client ID without issue?
Apache NiFi uses the Paho Client like most places.
import org.eclipse.paho.client.mqttv3.MqttCallback; |
import org.eclipse.paho.client.mqttv3.MqttException; |
import org.eclipse.paho.client.mqttv3.MqttMessage; |
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
Created on 12-27-2017 07:27 AM - edited 08-18-2019 02:51 AM
Thanks for the immediate response. I will be describing in details the issue I am facing.
Currently i have setup a Nifi cluster of 4 nodes, this results in all Nifi Processors running in any of the node in a non-deterministic manner.
At the start when EMQTT servers have been subscribed using ConsumeMQTT processor using a particular client Id, this results in EMQTT broker node maintaining this client id (consumer1) connecting from node1 (lets assume).
After some time, the same processor switches to node2, this leads to EMQTT broker again registering client id (consumer1) connecting from node2. This leads EMQTT to kick out both the clients as they have conflicting information (node1 and node2 having the same client Id)
We tried to solve this by setting the processor to run only on Primary Node, this solves the problem for some time until the Primary Nodes switches to a different node, Post which again the conflict issue starts occurring.
We are looking for a solution where the ConsumeMQTT processor would dynamically generate the client id every time on a new connection, this would solve the issue of conflicting clients Ids. (for this we were to set the client id to 'empty string', which we assumed ConsumeMQTT would generate dynamically)
Please suggest if we can do any alternative approach for this problem. Although forking the ConsumeMQTT process and making a custom version of it would solve the problem, but we were looking for a solution from the Apache Nifi Product wise.
I have attached the connection error we are receiving from the EMQTT.
Technically speaking, although it says there is connection error, it is able to connect randomly, but it drastically replicates the messages when in a Nifi Cluster mode. (e.g. We published 10 messages, we got around 60 messages at the consumer). When we changed it to a Nifi Standalone mode ( for the same 10 messages, we got 10 messages at the consumer).
Created 02-27-2018 12:59 AM
Hi everyone.
I have same issue.
My cluster has 2 nifi nodes and 1 MQTT Server(Mosquitto or EMQTT).
I think NiFi nodes must have each unique ClientID
I want to change Client ID at ConsumeMQTT by using NiFi Expression Language.
But,it is not supported.
I dont know how to set unique clientID at ConsumeMQTT
Could you tell me if there is way to set unique Cliend ID??
For now,I installed NiFi and MQTT at same servers.
URI is [localhost:18883]
But it is not good.
Created 06-23-2018 01:14 PM
You cannot connect from several nodes to MQTT Broker with the same client id. On the other side this makes no sense since every NiFi node would get the same messages to handle (which is normally not a use case). To solve this problem you should set "run on primary node only" for the ConsumeMQTT processor. But unfortunately this seems not to work 😞
All nodes on the NiFi cluster still makes a connection. I think this is an issue in the development of the processor. It seems that ConsumeMQTT makes a connection to the MQTT broker on all nodes although it should run only on primary node.