Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
Super Guru

Sensor Reading with Apache NiFi 1.0.0

There are many types of sensors, devices and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, some give you access to install software.

How To Access Sensors

One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java based NiFi node.

It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system).

Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email.

Device Push to NiFi

Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.


Reference: Paho-MQTT

Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt

import paho.mqtt.client as paho
client = paho.Client()
client.connect("servername", 1883, 60)
client.publish("sensor", payload="Test", qos=0, retain=True)

Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/ )

NiFi Poll Device

NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such.

I setup a Flask Server on RPI, to run my REST API, I run this in a shell script.

flask run --host= --port=8888 --no-debugger

To install Flask, you need to run pip install flask

Sensor Reading Code

from flask import Flask, jsonify
import sys
import time
import datetime
import subprocess
import sys
import urllib2
import json
import paho.mqtt.client as paho
from sense_hat import SenseHat

sense = SenseHat()

app = Flask(__name__)

@app.route('/pi/api/v1.0/sensors', methods=['GET'])
def get_sensors():
        p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
        out, err = p.communicate()
        temp = sense.get_temperature()
        temp = round(temp, 1)
        temph = sense.get_temperature_from_humidity()
        temph = round(temph, 1)
        tempp = sense.get_temperature_from_pressure()
        tempp = round(tempp, 1)
        humidity = sense.get_humidity()
        humidity = round(humidity, 1)
        pressure = sense.get_pressure()
        pressure = round(pressure, 1)
        tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]

# As an option we can push this message when we get called as well
      client = paho.Client()
       client.connect("mqttmessageserver", 1883, 60)
      client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True)
        return jsonify({'readings': tasks})

@app.route('/pi/api/v1.0/location', methods=['GET'])
def get_loc():
        orientation = sense.get_orientation()
        pitch = orientation['pitch']
        roll = orientation['roll']
        yaw = orientation['yaw']
        acceleration = sense.get_accelerometer_raw()
        x = acceleration['x']
        y = acceleration['y']
        z = acceleration['z']
        x=round(x, 0)
        y=round(y, 0)
        z=round(z, 0)
        tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
        return jsonify({'readings': tasks})

@app.route('/pi/api/v1.0/show', methods=['GET'])
def get_pi():
        temp = sense.get_temperature()
        temp = round(temp, 1)
        humidity = sense.get_humidity()
        humidity = round(humidity, 1)
        pressure = sense.get_pressure()
        pressure = round(pressure, 1)
        # 8x8 RGB
        info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure)
        sense.show_message(info, text_colour=[255, 0, 0])
        tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
        return jsonify({'readings': tasks})

if __name__ == '__main__':

The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity and barometric pressures it also has a 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for 2 way very visceral communication to remote devices. This could be used to notify local personnel of conditions.


nifi 1.0.0 Flows





JSON File Landed in HDFS in our HDP 2.5 Cluster

[root@myserverhdp sensors]# hdfs dfs -ls /sensor
Found 2 items
-rw-r--r--   3 root hdfs        202 2016-09-09 17:26 /sensor/181528179026826
drwxr-xr-x   - hdfs hdfs          0 2016-09-09 15:43 /sensor/failure
[root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826
  "readings": [
      "cputemp": "temp=55.8'C\n",
      "humidity": 40.8,
      "pressure": 1014.1,
      "temp": 40.0,
      "tempf": 84.0,
      "temph": 40.0,
      "tempp": 39.1


The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to Hive via PutHiveQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.


Super Guru

To install a Mosquitto MQTT Server on Centos7

yum -y install unzip

Step 1: Add the CentOS 7 mosquitto repository

cd /etc/yum.repos.d


sudo yum update

Step 2: Install mosquitto & mosquitto-clients

sudo yum install -y mosquitto mosquitto-clients

Step 3: Run mosquitto

sudo su /usr/sbin/mosquitto -d -c /etc/mosquitto/mosquitto.conf > /var/log/mosquitto.log 2>&1

Super Guru

Flow File: sensor.xml

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.
Version history
Last update:
‎08-17-2019 10:12 AM
Updated by:
Top Kudoed Authors