- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 03-27-2018 08:38 AM - edited 08-17-2019 08:16 AM
In this article we will see how to produce messages using a simple python script and consume messages using ConsumeMQTT processor and put them in HDFS using PutHDFS
Note: I'm using CentOS 7 and HDP 2.6.3 for this article
.
1) Install MQTT
sudo yum -y install epel-release sudo yum -y install mosquitto
.
2) Start MQTT
sudo systemctl start mosquitto sudo systemctl enable mosquitto
.
3) Install paho-mqtt python library
yum install python-pip pip install paho-mqtt
.
4) Configure MQTT password for the user.
I have created a sample user 'aditya' and set the password to 'test'
[root@test-instance-4 ~]# useradd aditya [root@test-instance-4 ~]# sudo mosquitto_passwd -c /etc/mosquitto/passwd aditya Password: Reenter password:
.
5) Disable anonymous login to MQTT
Open the file (/etc/mosquitto/mosquitto.conf ) and add the below entries and restart mosquitto
allow_anonymous false password_file /etc/mosquitto/passwd
sudo systemctl restart mosquitto
.
6) Design the NiFi flow to consume messages and put into hdfs
Configure MQTT processor: Right Click on ConsumeMQTT -> Configure -> Properties.
Set Broker URI, Client Id, username, password, Topic filter and Max Queue Size
Configure PutHDFS processor: Set Hadoop Configuration resources and Directory( to store messages)
.
7) Create a sample python script to publish messages. Use mqttpublish.txt attached and rename it to MQTTPublish.py to publish messages
.
😎 Run the Nifi flow.
.
9) Run the python script attached.
python MQTTPublish.py
.
10) Check the directory to check if the messages are put in HDFS
hdfs dfs -ls /user/aditya/ hdfs dfs -cat /user/aditya/*
.
Hope this helps 🙂