Community Articles
Find and share helpful community-sourced technical articles
Labels (1)

In this article we will see how to produce messages using a simple python script and consume messages using ConsumeMQTT processor and put them in HDFS using PutHDFS

Note: I'm using CentOS 7 and HDP 2.6.3 for this article


1) Install MQTT

sudo yum -y install epel-release
sudo yum -y install mosquitto


2) Start MQTT

sudo systemctl start mosquitto
sudo systemctl enable mosquitto


3) Install paho-mqtt python library

yum install python-pip
pip install paho-mqtt


4) Configure MQTT password for the user.

I have created a sample user 'aditya' and set the password to 'test'

[root@test-instance-4 ~]# useradd aditya
[root@test-instance-4 ~]# sudo mosquitto_passwd -c /etc/mosquitto/passwd aditya
Reenter password:


5) Disable anonymous login to MQTT

Open the file (/etc/mosquitto/mosquitto.conf ) and add the below entries and restart mosquitto

allow_anonymous false
password_file /etc/mosquitto/passwd
 sudo systemctl restart mosquitto


6) Design the NiFi flow to consume messages and put into hdfs


Configure MQTT processor: Right Click on ConsumeMQTT -> Configure -> Properties.

Set Broker URI, Client Id, username, password, Topic filter and Max Queue Size


Configure PutHDFS processor: Set Hadoop Configuration resources and Directory( to store messages)



7) Create a sample python script to publish messages. Use mqttpublish.txt attached and rename it to to publish messages


Run the Nifi flow.


9) Run the python script attached.



10) Check the directory to check if the messages are put in HDFS

hdfs dfs -ls /user/aditya/
hdfs dfs -cat /user/aditya/*


Hope this helps


Don't have an account?
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 08:16 AM
Updated by:
Top Kudoed Authors