Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Super Guru

In this article we will see how to produce messages using a simple python script and consume messages using ConsumeMQTT processor and put them in HDFS using PutHDFS

Note: I'm using CentOS 7 and HDP 2.6.3 for this article

.

1) Install MQTT

sudo yum -y install epel-release
sudo yum -y install mosquitto

.

2) Start MQTT

sudo systemctl start mosquitto
sudo systemctl enable mosquitto

.

3) Install paho-mqtt python library

yum install python-pip
pip install paho-mqtt

.

4) Configure MQTT password for the user.

I have created a sample user 'aditya' and set the password to 'test'

[root@test-instance-4 ~]# useradd aditya
[root@test-instance-4 ~]# sudo mosquitto_passwd -c /etc/mosquitto/passwd aditya
Password:
Reenter password:

.

5) Disable anonymous login to MQTT

Open the file (/etc/mosquitto/mosquitto.conf ) and add the below entries and restart mosquitto

allow_anonymous false
password_file /etc/mosquitto/passwd
 sudo systemctl restart mosquitto

.

6) Design the NiFi flow to consume messages and put into hdfs

64868-nifi-flow.png

Configure MQTT processor: Right Click on ConsumeMQTT -> Configure -> Properties.

Set Broker URI, Client Id, username, password, Topic filter and Max Queue Size

64869-consumemqtt.png

Configure PutHDFS processor: Set Hadoop Configuration resources and Directory( to store messages)

64870-puthdfs.png

.

7) Create a sample python script to publish messages. Use mqttpublish.txt attached and rename it to MQTTPublish.py to publish messages

.

😎 Run the Nifi flow.

.

9) Run the python script attached.

python MQTTPublish.py

.

10) Check the directory to check if the messages are put in HDFS

hdfs dfs -ls /user/aditya/
hdfs dfs -cat /user/aditya/*

.

Hope this helps 🙂


mqttpublish.txt

6,603 Views