Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Contributor

Use Case

In the Internet of Things era, we live in world of connected things like never before with 50 billion assets connected between fleets of vehicles (aircraft, trucks, trains, smart devices) and generating an unprecedented amount of data. Even with the availability of machine-generated data, organizations are still struggling to take advantage of that data to efficiently failure-proof their industrial IOT environment.

The business impact across industries of production equipment failure can result in staggering financial costs depending on the industry, but also catastrophic events which can result on human life loss or environmental disaster.

When talking to customers about capabilities provided by Hortonworks Data Flow, a recurrent use case being discussed by customers in industries such as oil & gas, utilities, manufacturing and transportation is the ability to take advantage of data in motion to implement predictive maintenance.

In this article, I’m going to use the following HDP & HDF Apache components MiNiFi, NiFi, Kafka, Spark structured streaming, and Druid to demonstrate an end-to-end predictive maintenance use case for a hydraulic system.

Data Set

The data set being used is a real-life data set with sensor data generated by a hydraulic test rig. The data set is available on the UCI Machine Learning repository The original data set contains raw process sensor data (i.e. without feature extraction) which are structured as matrices (tab-delimited) with the rows representing the cycles and the columns the data points within a cycle.

For this current use case, the data set has been transformed using Apache Hive to be in a json consumable format, that will be streamed through a python script in a loop. The data transformation involved the creation, pivoting and joins of tables having 6000 columns (for 100 Hz sampling rate). Shell scripts had to be used to generate SQL statements for data transformation. The final data set is sensordatafull.json.zip.

The Data Dictionary is self-descriptive for the different sensors involved:

SensorPhysical quantityUnitSampling Rate
cycleidCycle IDN/AN/A
tstampSequential int representing unique record IDN/AN/A
psa_barAvg Pressure over sampling intervalbar100 Hz

psb_bar

Avg Pressure over sampling intervalbar100 Hz
psc_barAvg Pressure over sampling intervalbar100 Hz
psd_barAvg Pressure over sampling intervalbar100 Hz
pse_barAvg Pressure over sampling intervalbar100 Hz
psf_barAvg Pressure over sampling intervalbar100 Hz
motor_power_wattAvg Motor power over sampling intervalW100 Hz
fsa_vol_flowAvg Volume flow over sampling intervall/min10 Hz
fsb_vol_flowAvg Volume flow over sampling intervall/min10 Hz
tsa_tempAvg Temperature over sampling intervalCelsius1 Hz
tsb_tempAvg Temperature over sampling intervalCelsius1 Hz
tsc_tempAvg Temperature over sampling intervalCelsius1 Hz
tsd_tempAvg Temperature over sampling intervalCelsius1 Hz
vs_vibAvg Vibration over sampling intervalmm/s1 Hz
cool_eff_pctAvg Cooling efficiency (virtual) over sampling interval%1 Hz
cool_pwr_kwAvg Cooling power (virtual) over sampling intervalkW1 Hz
eff_fact_pctEfficiency factor over sampling interval%1 Hz

A data record looks like the following:

{
   "cycleid":1,
   "tstamp":1,
   "psa_bar":176.9,
   "psb_bar":12.14,
   "psc_bar":0.18,
   "psd_bar":0.0,
   "pse_bar":9.75,
   "psf_bar":9.84,
   "motor_power_watt":2613.47,
   "fsa_vol_flow":1.04,
   "fsb_vol_flow":10.17,
   "tsa_temp":35.57,
   "tsb_temp":40.961,
   "tsc_temp":38.32,
   "tsd_temp":30.363,
   "vs_vib":0.604,
   "cool_eff_pct":47.202,
   "cool_pwr_kw":2.184,
   "eff_fact_pct":2.184
}

In this use case, we will build a predictive model over the cooling system. We will be monitoring and predict failure in analyzing the variables tsa_temp, tsb_temp, tsc_temp, tsd_temp, cool_eff_pct and cool_pwr_kw.

In the labelled data set, we use the cooling condition profile information:

- Cooler condition / %:

  • 3: close to total failure
  • 20: reduced efficiency
  • 100: full efficiency

For simplicity sake, we will just use a binary logistic regression model, so we will flag in the label any data row where Cooler condition < 100.

Environment

For the environment used, I used an AWS ec2 Linux VM instance m4.4x.large on which I used a handy script to deploy a single node HDP+HDF VM to deploy HDP 3.0.1 and HDF 3.3 with a single command. This script would work on any CentOS 7 VM with enough capacity to have all required HDP+HDF components running.

Solution Architecture

The diagram below illustrates the solution architecture used in this use case:

We are going to use a python script to stream data from the data set in a loop and send it to an MQTT broker. Apache MiNiFi will read data from the MQTT broker and send to Apache NiFi using NiFi site-to-site protocol.

After ingestion and transforming the data using Apache NiFi, data will be written back to Apache Kafka. The only transformation we do with Apache NiFi is to add a processing time which is necessary to both Apache Druid and Apache Spark Streaming to implement windowing aggregation.

Once the data is written onto Kafka, we will use built-in Kafka connectors to ingest data in Apache Druid, and Apache Spark to ingest and process the data in real-time.

Using Apache ML pipelines, a binary logistic model was built to predict the cooling system failure based on related fields available on the data set using data at rest using a notebook on Apache Zeppelin. This model is then used to score cooling system related fields in real-time using Apache Spark Structured Streaming.

The predictions made by the model are written on a notification queue on Kafka. Messages are consumed by Apache NiFi. If a failure prediction is made, an email notification is generated by Apache NiFi.

Data streamed into Druid is also visualized using Superset as a dashboard.

MQTT Broker

In this example, we will use the mosquitto MQTT broker and deploy it onto the Centos 7 machine, use the following commands:

sudo yum -y install epel-release
sudo yum -y install mosquitto
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

In order to make sure it works, you can do the following quick test with 2 sessions opened side by side. One the first session, subscribe to the MQTT topic test-topic on localhost:

mosquitto_sub -h localhost -t test-topic

On the second session, publish a message on test-topic on localhost:

mosquitto_pub -h localhost -t test-topic -m "test message"

You should see a message appearing on the first session.

Streaming Data

To stream in data, I used the mqttgen.py python script to read data from the sensordatafull.json file and push it to MQTT. In order for the script to work, copy the file sensordatafull.json to /tmp and make sure the permissions are set properly.

In order to run this script, you will need the config.json where you need to customize the host name.

You can launch the script as following to run it as a background:

nohup python mqttgen.py ./config.json &

To make sure messages are written, you can use the mosquitto_sub command:

mosquitto_sub -h localhost -t sensors

Consume MQTT Messages with MiNiFi

In order to deploy Apache MiNiFi, you have to use the MiNiFi version matching the Apache NiFi version to avoid libraries versions mismatches between the two components. You will need to deploy the MiNiFi toolkit and the MiNiFi Java agent. For HDF 3.3, please check the HDF repository links.

In order to deploy MiNiFi, you can follow these instructions on this lab. Note that the versions of MiNiFi are different, but the installation instructions do not differ.

We are going to use Apache MiNiFi to consume data from the MQTT broker. The following flow can be used on Apache MiNiFi:

In order to do that, you will need to first copy the ConsumeMQTT related nar file onto the minifi processor library path, as ConsumeMQTT processor is not shipped with MiNiFi. For example, you can copy the file as following (HDF, nar versions, and install path may differ):

cp /usr/hdf/3.3.0.0-165/nifi/lib/nifi-mqtt-nar-1.8.0.3.3.0.0-165.nar /usr/hdf/minifi-0.6.0.3.3.0.0-165/lib

You can now deploy the flow on MiNiFi. You can follow these instructions in the lab above to deploy this MiNiFi flow. Do not start minifi yet, as we need to deploy the main NiFi ingestion flow.

Schema Registry

The following schemas needs to be created for use by Apache NiFi:

- The rig_sensor_data schema.

- The rig_sensor_pred schema.

Data Ingestion using NiFi

We are going to use the following flow to read data from MiNiFi and write it onto Kafka:

You will need to customize in the flow for your environment:

  • The Schema Registry URI in Controller Services
  • The Kafka broker URI in PublishKafkaRecord processor.

You can now start the MiNiFi agent using the instructions given on the MiNiFi lab, and start the main NiFi flow.

When starting the flow, do not start the UpdateAttribute processor following the EvaluateJsonPath. This processor is to give you the ability to switch predictions to positive or negative during demos. For negative, you will need to alter the processor to divide the values instead of multiplying them.

You can check if the data is written on Kafka:

su - kafka
cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic rig_sensor_data

Once you have verified that the data is flowing, you can stop the NiFi flow for now.

Data Science with Apache Spark

Copy the sensordata.del file on HDFS and make sure the file is readable by user zeppelin. In the current script, it is reading the file from /ds_final/sensor_data_profiled. It is the same data as sensordata.json, but enriched with profile information needed to label the data for predictive modeling. The file is in tab delimited format.

Create a zeppelin notebook and copy the following Spark code as a first paragraph. This code will read the data from HDFS and create a dataframe:

%spark2
import org.apache.spark.sql.types._

val pathSensors = "/ds_final/sensor_data_profiled"
val dfSensors = spark.read.option("header", "false").option("delimiter", "\t").csv(pathSensors)

val sensorsDF = dfSensors.select(
     dfSensors("_c0").cast(IntegerType).as("cycleid")
    ,dfSensors("_c1").cast(IntegerType).as("tstamp")
    ,dfSensors("_c2").cast(DoubleType).as("ps1")
    ,dfSensors("_c3").cast(DoubleType).as("ps2")
    ,dfSensors("_c4").cast(DoubleType).as("ps3")
    ,dfSensors("_c5").cast(DoubleType).as("ps4")
    ,dfSensors("_c6").cast(DoubleType).as("ps5")
    ,dfSensors("_c7").cast(DoubleType).as("ps6")
    ,dfSensors("_c8").cast(DoubleType).as("eps1")    
    ,dfSensors("_c9").cast(DoubleType).as("fs1")
    ,dfSensors("_c10").cast(DoubleType).as("fs2")
    ,dfSensors("_c11").cast(DoubleType).as("ts1")
    ,dfSensors("_c12").cast(DoubleType).as("ts2")
    ,dfSensors("_c13").cast(DoubleType).as("ts3")
    ,dfSensors("_c14").cast(DoubleType).as("ts4")
    ,dfSensors("_c15").cast(DoubleType).as("vs1")    
    ,dfSensors("_c16").cast(DoubleType).as("ce")
    ,dfSensors("_c17").cast(DoubleType).as("cp")
    ,dfSensors("_c18").cast(DoubleType).as("se")
    ,dfSensors("_c19").cast(DoubleType).as("cooler_cond")
    ,dfSensors("_c20").cast(DoubleType).as("valve_cond")
    ,dfSensors("_c21").cast(DoubleType).as("pump_leak")
    ,dfSensors("_c22").cast(DoubleType).as("hydrau_bar")
    ,dfSensors("_c23").cast(DoubleType).as("stable")
    )
sensorsDF.createOrReplaceTempView("sensorDataProfiled")

You can issue Spark SQL queries on this dataframe to explore the data set. For example:

%sql
select cycleid, avg(ts1) as ts1_avg, avg(ts2) as ts2_avg, avg(ts3) as ts3_avg, 
avg(ts4) as ts4_avg, avg(ce) as ce_avg
from sensorDataprofiled
where cooler_cond=3
group by cycleid

You can submit various spark-sql queries to explore the dataset.

The following spark script builds the logistic model using Spark ML model pipeline. The model is 100% accurate as you will see with the area under ROC curve metric due to the data set simplicity. Eg, when temperature rises and cooling power and efficiency decrease, a cooling system malfunction is flagged by the model. After succesful execution, a timestamped model is saved onto HDFS in /models path. You will need to customize the HDFS Namenode URI:

%spark2
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.{Pipeline, PipelineModel}

import java.io.StringWriter;
import java.util.Date;
import java.text.SimpleDateFormat;

//Dataframe for temperature and cooler condition metrics with label
val df=sqlContext.sql("""with x as (select cycleid,ts1,ts2,ts3,ts4,ce,cp, CASE WHEN cooler_cond < 100 THEN 1 ELSE 0 END AS cool_maint_req from sensorDataprofiled) select avg(ts1) as ts1, avg(ts2) as ts2, avg(ts3) as ts3, avg(ts4) as ts4,avg(ce) as ce, avg(cp) as cp, max(cool_maint_req) as cool_maint_req from x group by cycleid""")


val featureCols = Array("ts1","ts2","ts3","ts4","ce","cp")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")

val labelIndexer = new StringIndexer().setInputCol("cool_maint_req").setOutputCol("label")
val lr = new LogisticRegression()

//Splitting data set into training and test data
val splitSeed = 5352
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3), splitSeed)

//Train model
val pipeline = new Pipeline().setStages(Array(assembler, labelIndexer, lr))
val model = pipeline.fit(trainingData)

//Predict on test data
val predictions =  model.transform(testData)
predictions.show()

//A common metric used for logistic regression is area under the ROC curve (AUC). We can use the BinaryClasssificationEvaluator to obtain the AUC
// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.**
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")

// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).**
val accuracy = evaluator.evaluate(predictions)
println("Accuracy of the model:" + accuracy)

//Save the model on HDFS
val fileName = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss'-LR-Cooling'").format(new Date())
val destLocation = fileName.toString()

val path = "hdfs://ip-172-31-24-159.us-west-1.compute.internal:8020/models/" + destLocation
model.save(path)

Real-time Data Processing and Scoring using Apache Spark Structured Streaming

First, create a kafka topic for Spark streaming to write messages into:

su - kafka
cd /usr/hdp/current/kafka-broker/
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic predictive_maint_reading

Start the MiNiFi flow (if not started already), and start the NiFi flow to get data written for consumption by Apache Spark.

As user zeppelin, start an Apache Spark shell as following:

su - zeppelin
export SPARK_MAJOR_VERSION=2
spark-shell --num-executors 2 --executor-memory 1G --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master yarn-client

Enter in paste mode by typing :paste then paste the following script. You will need to customize a few parameters such as the kafka broker URIs when reading and writing to kafka, as well as the Spark ML model path. After pasting, enter ctrl+D:

import org.apache.spark.sql.streaming.ProcessingTime
import spark.implicits._

//Reading from Kafka queue
val kafkaSensorDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ip-172-31-24-159.us-west-1.compute.internal:6667")
.option("subscribe", "rig_sensor_data")
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()

// Load the LogisticRegressionModel
import org.apache.spark.ml.PipelineModel
val LGRModel = PipelineModel.read.load("hdfs://ip-172-31-24-159.us-west-1.compute.internal:8020/models/2018-12-13_15_50_39-LR-Cooling");

//Parsing the data set in JSON and applying transformations on the timestamp
import org.apache.spark.sql.types._
val SensorSchema = (new StructType)
.add("processingtime",LongType)
.add("cycleid",IntegerType)
.add("tstamp",LongType)
.add("psa_bar",FloatType)
.add("psb_bar",FloatType)
.add("psc_bar",FloatType)
.add("psd_bar",FloatType)
.add("pse_bar",FloatType)
.add("psf_bar",FloatType)
.add("motor_power_watt",FloatType)
.add("fsa_vol_flow",FloatType)
.add("fsb_vol_flow",FloatType)
.add("tsa_temp",FloatType)
.add("tsb_temp",FloatType)
.add("tsc_temp",FloatType)
.add("tsd_temp",FloatType)
.add("cool_eff_pct",FloatType)
.add("cool_pwr_kw",FloatType)
.add("eff_fact_pct",FloatType)


val SensorDF = kafkaSensorDF.select(from_json($"value".cast("string"), SensorSchema).as("SensorJSON"))
val SensorDF2 = SensorDF.withColumn("timestamp", from_unixtime(SensorDF.col("SensorJSON.processingtime").divide(1000)).cast(TimestampType))
val SensorDF3 = SensorDF2.select("timestamp","SensorJSON.tsa_temp","SensorJSON.tsb_temp","SensorJSON.tsc_temp","SensorJSON.tsd_temp","SensorJSON.cool_eff_pct","SensorJSON.cool_pwr_kw")


//Calculating average aggregates over 1 minute window 
//as the Spark ML model uses 1 minute averages of sensor readings as input

val FormatDF = SensorDF3.withWatermark("timestamp","1 minutes").groupBy(window($"timestamp", "1 minute"))
.agg(
avg("tsa_temp").alias("ts1"),
avg("tsb_temp").alias("ts2"),
avg("tsc_temp").alias("ts3"),
avg("tsd_temp").alias("ts4"),
avg("cool_eff_pct").alias("ce"),
avg("cool_pwr_kw").alias("cp")
).toDF("window","ts1","ts2","ts3","ts4","ce","cp")

//Using the Spark ML pipeline model to score in real-time
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val assembler = new VectorAssembler().setInputCols(Array("ts1", "ts2", "ts3", "ts4", "ce", "cp" ))
val VectorDF = assembler.transform(FormatDF.na.drop)

val predictions =  LGRModel.transform(VectorDF).select("ts1", "ts2", "ts3","ts4","ce","cp","prediction")

//Write prediction back to a kafka queue
val SinkQuery = predictions.select(to_json(struct($"ts1",$"ts2",$"ts3",$"ts4",$"ce",$"cp",$"prediction")).alias("value")).writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "ip-172-31-24-159.us-west-1.compute.internal:6667")
.option("checkpointLocation", "checkpoint")
.option("topic","predictive_maint_reading")
.start()

You will have to wait a few minutes to see predictions being computed on the shell. You can check the predictive_maint_reading queue in Kafka to make sure the messages are being written, as described in earlier steps.

Processing Notifications in real-time with Apache NiFi

In the flow below, we are going to read from the Spark Streaming notification queue and if a reading is flagged, generate an email automatically:

You will need to customize a few parameters such as the Kafka broker URIs, the Hortonworks schema registry URI as well as the email parameters. The current example uses a gmail address. I would recommend the stop the PutEmail processor in order to prevent emails sent every minute during a phase where the hydraulic test system data is showing an imminent cooling system failure.

The email notification will look like as follows:

Ingesting data with Apache Druid

For instructions on how to enable the Druid Kafka ingestion service, please check the section 1 in the HCC Article: Druid Kafka Integration Service+Hive.

You will need to use the following ingestion spec file. You may need to customize the kafka bootstrap URI:

{
   "type":"kafka",
   "dataSchema":{
      "dataSource":"temp_sensor_dsn",
      "parser":{
         "type":"string",
         "parseSpec":{
             "format": "json",
             "timestampSpec":{
               "column":"processingtime",
               "format":"auto"
            },
            "dimensionsSpec":{
               "dimensions":[
                  "cycleid"
               ],
               "dimensionExclusions":[


               ]
            },
            "columns":[
               "processingtime",
               "cycleid",
               "tstamp",
               "psa_bar",
               "psb_bar",
               "psc_bar",
               "psd_bar",
               "pse_bar",
               "psf_bar",
               "motor_power_watt",
               "fsa_vol_flow",
               "fsb_vol_flow",
               "tsa_temp",
               "tsb_temp",
               "tsc_temp",
               "tsd_temp",
               "vs_vib",
               "cool_eff_pct",
               "cool_pwr_kw",
               "eff_fact_pct"
            ]
         }
      },
      "granularitySpec":{
         "type":"uniform",
         "segmentGranularity":"fifteen_minute",
         "queryGranularity":"minute"
      },
      "metricsSpec":[
         {
            "type":"count",
            "name":"count"
         },
         {
            "type":"doubleMax",
            "fieldName":"tsa_temp",
            "name":"tsa_temp_max"
         },
         {
            "type":"doubleMax",
            "fieldName":"tsb_temp",
            "name":"tsb_temp_max"
         },
         {
            "type":"doubleMax",
            "fieldName":"tsc_temp",
            "name":"tsc_temp_max"
         },
         {
            "type":"doubleMax",
            "fieldName":"tsd_temp",
            "name":"tsd_temp_max"
         },
         {
            "type":"doubleMin",
            "fieldName":"cool_eff_pct",
            "name":"cool_eff_min"
         },
         {
            "type":"doubleMax",
            "fieldName":"cool_eff_pct",
            "name":"cool_eff_max"
         },
         {
            "type":"doubleMin",
            "fieldName":"cool_pwr_kw",
            "name":"cool_pwr_min"
         },
         {
            "type":"doubleMax",
            "fieldName":"cool_pwr_kw",
            "name":"cool_pwr_max"
         },
         {
            "type":"doubleMin",
            "fieldName":"eff_fact_pct",
            "name":"eff_fact__min"
         },
         {
            "type":"doubleMax",
            "fieldName":"eff_fact_pct",
            "name":"eff_fact__max"
         }
      ]
   },
   "tuningConfig":{
      "type":"kafka",
      "maxRowsPerSegment":50000
   },
   "ioConfig":{
      "topic":"rig_sensor_data",
      "consumerProperties":{
         "bootstrap.servers":"ip-172-31-24-159.us-west-1.compute.internal:6667"
      },
      "taskCount":1,
      "replicas":1,
      "taskDuration":"PT30M"
   }
}

In order to submit the ingestion spec file above, you can use the following command for example. You will need to customize the hostname in the URL:

curl -X POST -H 'Content-Type: application/json' -d @kafka.json http://ip-172-31-24-159.us-west-1.compute.internal:8095/druid/indexer/v1/supervisor

As a response, you will see something like the following with an ID of your application:

{"id":"rig_sensor_data2"}

As discussed in the HCC article referenced above, you can check the Druid Overlord console. You should see one running task at least:

Once the first task completes after 15-20 minutes, you can proceed to build a real-time visualization with Superset.

Data Visualization with Superset

Once at least one indexing task has completed, you can login to SuperSet. Once, you click on Scan New Datasources, you should see the temp_sensor_dsn data source appear:

You can now build a dashboard. For a step by step instruction on how to build a dashboard, you can follow the example in this lab.

This is an example of a near real-time visualization showing us the maximum temperature reading by 4 sensors on the hydraulic rig, with a granularity of 1 minute data point:

Conclusion

In this article, we went through an end-to-end predictive maintenance use case with the ingestion of IOT data in real-time using Apache MiNiFi, and Apache NiFi. Once the data has been ingested and enriched, it is written to Kafka. From Kafka, the data is consumed for real-time analytics purpose using Spark Structured Streaming to compute aggregates (avg over a minute), and score the data in real-time. We also showcase near real-time visualization of the data using Apache Druid and Superset.

Additional areas that could be explored would be to create a Hive table using the Druid cube as storage, and be able to query the data using standard SQL in near real-time.

Credits

The MQTT Generator was inspired from this Github:

https://gist.github.com/marianoguerra/be216a581ef7bc23673f501fdea0e15a

HCC Article on Druid Kafka Integration Service: https://community.hortonworks.com/articles/227104/druid-kafka-integration-service-hive.html

1,646 Views
Comments

Pretty good insight

@ zhoussen , the link for "Processing Notifications in real-time with Apache NiFi " - flow below is broken.

Can you please add this?

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎12-21-2018 05:27 PM
Updated by:
 
Contributors
Top Kudoed Authors