In this article we will see how to produce messages using a simple python script and consume messages using ConsumeMQTT processor and put them in HDFS using PutHDFS

Note: I'm using CentOS 7 and HDP 2.6.3 for this article


1) Install MQTT

sudo yum -y install epel-release
sudo yum -y install mosquitto


2) Start MQTT

sudo systemctl start mosquitto
sudo systemctl enable mosquitto


3) Install paho-mqtt python library

yum install python-pip
pip install paho-mqtt


4) Configure MQTT password for the user.

I have created a sample user 'aditya' and set the password to 'test'

[root@test-instance-4 ~]# useradd aditya
[root@test-instance-4 ~]# sudo mosquitto_passwd -c /etc/mosquitto/passwd aditya
Reenter password:


5) Disable anonymous login to MQTT

Open the file (/etc/mosquitto/mosquitto.conf ) and add the below entries and restart mosquitto

allow_anonymous false
password_file /etc/mosquitto/passwd
 sudo systemctl restart mosquitto


6) Design the NiFi flow to consume messages and put into hdfs


Configure MQTT processor: Right Click on ConsumeMQTT -> Configure -> Properties.

Set Broker URI, Client Id, username, password, Topic filter and Max Queue Size


Configure PutHDFS processor: Set Hadoop Configuration resources and Directory( to store messages)



7) Create a sample python script to publish messages. Use mqttpublish.txt attached and rename it to to publish messages


😎 Run the Nifi flow.


9) Run the python script attached.



10) Check the directory to check if the messages are put in HDFS

hdfs dfs -ls /user/aditya/
hdfs dfs -cat /user/aditya/*


Hope this helps 🙂