Member since
05-02-2016
33
Posts
29
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2546 | 08-16-2017 03:49 PM | |
2179 | 08-16-2017 03:13 PM | |
3625 | 07-25-2017 09:58 PM | |
10718 | 05-30-2017 07:17 PM |
01-08-2019
09:00 PM
@bkv Check the YARN logs. It could be starving on YARN containers. You may need to adjust some YARN container settings there. As well, please post yours as a separate new issue rather than an answer to this one.
... View more
12-21-2018
05:27 PM
5 Kudos
Use Case In the Internet of Things era, we live in world of connected things like never before with 50 billion assets connected between fleets of vehicles (aircraft, trucks, trains, smart devices) and generating an unprecedented amount of data. Even with the availability of machine-generated data, organizations are still struggling to take advantage of that data to efficiently failure-proof their industrial IOT environment. The business impact across industries of production equipment failure can result in staggering financial costs depending on the industry, but also catastrophic events which can result on human life loss or environmental disaster. When talking to customers about capabilities provided by Hortonworks Data Flow, a recurrent use case being discussed by customers in industries such as oil & gas, utilities, manufacturing and transportation is the ability to take advantage of data in motion to implement predictive maintenance. In this article, I’m going to use the following HDP & HDF Apache components MiNiFi, NiFi, Kafka, Spark structured streaming, and Druid to demonstrate an end-to-end predictive maintenance use case for a hydraulic system. Data Set The data set being used is a real-life data set with sensor data generated by a hydraulic test rig. The data set is available on the UCI Machine Learning repository The original data set contains raw process sensor data (i.e. without feature extraction) which are structured as matrices (tab-delimited) with the rows representing the cycles and the columns the data points within a cycle. For this current use case, the data set has been transformed using Apache Hive to be in a json consumable format, that will be streamed through a python script in a loop. The data transformation involved the creation, pivoting and joins of tables having 6000 columns (for 100 Hz sampling rate). Shell scripts had to be used to generate SQL statements for data transformation. The final data set is sensordatafull.json.zip. The Data Dictionary is self-descriptive for the different sensors involved: Sensor Physical quantity Unit Sampling Rate cycleid Cycle ID N/A N/A tstamp Sequential int representing unique record ID N/A N/A psa_bar Avg Pressure over sampling interval bar 100 Hz psb_bar Avg Pressure over sampling interval bar 100 Hz psc_bar Avg Pressure over sampling interval bar 100 Hz psd_bar Avg Pressure over sampling interval bar 100 Hz pse_bar Avg Pressure over sampling interval bar 100 Hz psf_bar Avg Pressure over sampling interval bar 100 Hz motor_power_watt Avg Motor power over sampling interval W 100 Hz fsa_vol_flow Avg Volume flow over sampling interval l/min 10 Hz fsb_vol_flow Avg Volume flow over sampling interval l/min 10 Hz tsa_temp Avg Temperature over sampling interval Celsius 1 Hz tsb_temp Avg Temperature over sampling interval Celsius 1 Hz tsc_temp Avg Temperature over sampling interval Celsius 1 Hz tsd_temp Avg Temperature over sampling interval Celsius 1 Hz vs_vib Avg Vibration over sampling interval mm/s 1 Hz cool_eff_pct Avg Cooling efficiency (virtual) over sampling interval % 1 Hz cool_pwr_kw Avg Cooling power (virtual) over sampling interval kW 1 Hz eff_fact_pct Efficiency factor over sampling interval % 1 Hz A data record looks like the following: {
"cycleid":1,
"tstamp":1,
"psa_bar":176.9,
"psb_bar":12.14,
"psc_bar":0.18,
"psd_bar":0.0,
"pse_bar":9.75,
"psf_bar":9.84,
"motor_power_watt":2613.47,
"fsa_vol_flow":1.04,
"fsb_vol_flow":10.17,
"tsa_temp":35.57,
"tsb_temp":40.961,
"tsc_temp":38.32,
"tsd_temp":30.363,
"vs_vib":0.604,
"cool_eff_pct":47.202,
"cool_pwr_kw":2.184,
"eff_fact_pct":2.184
}
In this use case, we will build a predictive model over the cooling system. We will be monitoring and predict failure in analyzing the variables tsa_temp, tsb_temp, tsc_temp, tsd_temp, cool_eff_pct and cool_pwr_kw. In the labelled data set, we use the cooling condition profile information: - Cooler condition / %:
3: close to total failure 20: reduced efficiency 100: full efficiency For simplicity sake, we will just use a binary logistic regression model, so we will flag in the label any data row where Cooler condition < 100. Environment For the environment used, I used an AWS ec2 Linux VM instance m4.4x.large on which I used a handy script to deploy a single node HDP+HDF VM to deploy HDP 3.0.1 and HDF 3.3 with a single command. This script would work on any CentOS 7 VM with enough capacity to have all required HDP+HDF components running. Solution Architecture The diagram below illustrates the solution architecture used in this use case: We are going to use a python script to stream data from the data set in a loop and send it to an MQTT broker. Apache MiNiFi will read data from the MQTT broker and send to Apache NiFi using NiFi site-to-site protocol. After ingestion and transforming the data using Apache NiFi, data will be written back to Apache Kafka. The only transformation we do with Apache NiFi is to add a processing time which is necessary to both Apache Druid and Apache Spark Streaming to implement windowing aggregation. Once the data is written onto Kafka, we will use built-in Kafka connectors to ingest data in Apache Druid, and Apache Spark to ingest and process the data in real-time. Using Apache ML pipelines, a binary logistic model was built to predict the cooling system failure based on related fields available on the data set using data at rest using a notebook on Apache Zeppelin. This model is then used to score cooling system related fields in real-time using Apache Spark Structured Streaming. The predictions made by the model are written on a notification queue on Kafka. Messages are consumed by Apache NiFi. If a failure prediction is made, an email notification is generated by Apache NiFi. Data streamed into Druid is also visualized using Superset as a dashboard. MQTT Broker In this example, we will use the mosquitto MQTT broker and deploy it onto the Centos 7 machine, use the following commands: sudo yum -y install epel-release
sudo yum -y install mosquitto
sudo systemctl start mosquitto
sudo systemctl enable mosquitto In order to make sure it works, you can do the following quick test with 2 sessions opened side by side. One the first session, subscribe to the MQTT topic test-topic on localhost: mosquitto_sub -h localhost -t test-topic
On the second session, publish a message on test-topic on localhost: mosquitto_pub -h localhost -t test-topic -m "test message" You should see a message appearing on the first session. Streaming Data To stream in data, I used the mqttgen.py python script to read data from the sensordatafull.json file and push it to MQTT. In order for the script to work, copy the file sensordatafull.json to /tmp and make sure the permissions are set properly. In order to run this script, you will need the config.json where you need to customize the host name. You can launch the script as following to run it as a background: nohup python mqttgen.py ./config.json & To make sure messages are written, you can use the mosquitto_sub command: mosquitto_sub -h localhost -t sensors Consume MQTT Messages with MiNiFi In order to deploy Apache MiNiFi, you have to use the MiNiFi version matching the Apache NiFi version to avoid libraries versions mismatches between the two components. You will need to deploy the MiNiFi toolkit and the MiNiFi Java agent. For HDF 3.3, please check the HDF repository links. In order to deploy MiNiFi, you can follow these instructions on this lab. Note that the versions of MiNiFi are different, but the installation instructions do not differ. We are going to use Apache MiNiFi to consume data from the MQTT broker. The following flow can be used on Apache MiNiFi: In order to do that, you will need to first copy the ConsumeMQTT related nar file onto the minifi processor library path, as ConsumeMQTT processor is not shipped with MiNiFi. For example, you can copy the file as following (HDF, nar versions, and install path may differ): cp /usr/hdf/3.3.0.0-165/nifi/lib/nifi-mqtt-nar-1.8.0.3.3.0.0-165.nar /usr/hdf/minifi-0.6.0.3.3.0.0-165/lib You can now deploy the flow on MiNiFi. You can follow these instructions in the lab above to deploy this MiNiFi flow. Do not start minifi yet, as we need to deploy the main NiFi ingestion flow. Schema Registry The following schemas needs to be created for use by Apache NiFi: - The rig_sensor_data schema. - The rig_sensor_pred schema. Data Ingestion using NiFi We are going to use the following flow to read data from MiNiFi and write it onto Kafka: You will need to customize in the flow for your environment: The Schema Registry URI in Controller Services The Kafka broker URI in PublishKafkaRecord processor. You can now start the MiNiFi agent using the instructions given on the MiNiFi lab, and start the main NiFi flow. When starting the flow, do not start the UpdateAttribute processor following the EvaluateJsonPath. This processor is to give you the ability to switch predictions to positive or negative during demos. For negative, you will need to alter the processor to divide the values instead of multiplying them. You can check if the data is written on Kafka: su - kafka
cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic rig_sensor_data Once you have verified that the data is flowing, you can stop the NiFi flow for now. Data Science with Apache Spark Copy the sensordata.del file on HDFS and make sure the file is readable by user zeppelin. In the current script, it is reading the file from /ds_final/sensor_data_profiled. It is the same data as sensordata.json, but enriched with profile information needed to label the data for predictive modeling. The file is in tab delimited format. Create a zeppelin notebook and copy the following Spark code as a first paragraph. This code will read the data from HDFS and create a dataframe: %spark2
import org.apache.spark.sql.types._
val pathSensors = "/ds_final/sensor_data_profiled"
val dfSensors = spark.read.option("header", "false").option("delimiter", "\t").csv(pathSensors)
val sensorsDF = dfSensors.select(
dfSensors("_c0").cast(IntegerType).as("cycleid")
,dfSensors("_c1").cast(IntegerType).as("tstamp")
,dfSensors("_c2").cast(DoubleType).as("ps1")
,dfSensors("_c3").cast(DoubleType).as("ps2")
,dfSensors("_c4").cast(DoubleType).as("ps3")
,dfSensors("_c5").cast(DoubleType).as("ps4")
,dfSensors("_c6").cast(DoubleType).as("ps5")
,dfSensors("_c7").cast(DoubleType).as("ps6")
,dfSensors("_c8").cast(DoubleType).as("eps1")
,dfSensors("_c9").cast(DoubleType).as("fs1")
,dfSensors("_c10").cast(DoubleType).as("fs2")
,dfSensors("_c11").cast(DoubleType).as("ts1")
,dfSensors("_c12").cast(DoubleType).as("ts2")
,dfSensors("_c13").cast(DoubleType).as("ts3")
,dfSensors("_c14").cast(DoubleType).as("ts4")
,dfSensors("_c15").cast(DoubleType).as("vs1")
,dfSensors("_c16").cast(DoubleType).as("ce")
,dfSensors("_c17").cast(DoubleType).as("cp")
,dfSensors("_c18").cast(DoubleType).as("se")
,dfSensors("_c19").cast(DoubleType).as("cooler_cond")
,dfSensors("_c20").cast(DoubleType).as("valve_cond")
,dfSensors("_c21").cast(DoubleType).as("pump_leak")
,dfSensors("_c22").cast(DoubleType).as("hydrau_bar")
,dfSensors("_c23").cast(DoubleType).as("stable")
)
sensorsDF.createOrReplaceTempView("sensorDataProfiled")
You can issue Spark SQL queries on this dataframe to explore the data set. For example: %sql
select cycleid, avg(ts1) as ts1_avg, avg(ts2) as ts2_avg, avg(ts3) as ts3_avg,
avg(ts4) as ts4_avg, avg(ce) as ce_avg
from sensorDataprofiled
where cooler_cond=3
group by cycleid You can submit various spark-sql queries to explore the dataset. The following spark script builds the logistic model using Spark ML model pipeline. The model is 100% accurate as you will see with the area under ROC curve metric due to the data set simplicity. Eg, when temperature rises and cooling power and efficiency decrease, a cooling system malfunction is flagged by the model. After succesful execution, a timestamped model is saved onto HDFS in /models path. You will need to customize the HDFS Namenode URI: %spark2
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import java.io.StringWriter;
import java.util.Date;
import java.text.SimpleDateFormat;
//Dataframe for temperature and cooler condition metrics with label
val df=sqlContext.sql("""with x as (select cycleid,ts1,ts2,ts3,ts4,ce,cp, CASE WHEN cooler_cond < 100 THEN 1 ELSE 0 END AS cool_maint_req from sensorDataprofiled) select avg(ts1) as ts1, avg(ts2) as ts2, avg(ts3) as ts3, avg(ts4) as ts4,avg(ce) as ce, avg(cp) as cp, max(cool_maint_req) as cool_maint_req from x group by cycleid""")
val featureCols = Array("ts1","ts2","ts3","ts4","ce","cp")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val labelIndexer = new StringIndexer().setInputCol("cool_maint_req").setOutputCol("label")
val lr = new LogisticRegression()
//Splitting data set into training and test data
val splitSeed = 5352
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3), splitSeed)
//Train model
val pipeline = new Pipeline().setStages(Array(assembler, labelIndexer, lr))
val model = pipeline.fit(trainingData)
//Predict on test data
val predictions = model.transform(testData)
predictions.show()
//A common metric used for logistic regression is area under the ROC curve (AUC). We can use the BinaryClasssificationEvaluator to obtain the AUC
// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.**
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).**
val accuracy = evaluator.evaluate(predictions)
println("Accuracy of the model:" + accuracy)
//Save the model on HDFS
val fileName = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss'-LR-Cooling'").format(new Date())
val destLocation = fileName.toString()
val path = "hdfs://ip-172-31-24-159.us-west-1.compute.internal:8020/models/" + destLocation
model.save(path) Real-time Data Processing and Scoring using Apache Spark Structured Streaming First, create a kafka topic for Spark streaming to write messages into: su - kafka
cd /usr/hdp/current/kafka-broker/
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic predictive_maint_reading
Start the MiNiFi flow (if not started already), and start the NiFi flow to get data written for consumption by Apache Spark. As user zeppelin, start an Apache Spark shell as following: su - zeppelin
export SPARK_MAJOR_VERSION=2
spark-shell --num-executors 2 --executor-memory 1G --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master yarn-client Enter in paste mode by typing :paste then paste the following script. You will need to customize a few parameters such as the kafka broker URIs when reading and writing to kafka, as well as the Spark ML model path. After pasting, enter ctrl+D: import org.apache.spark.sql.streaming.ProcessingTime
import spark.implicits._
//Reading from Kafka queue
val kafkaSensorDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ip-172-31-24-159.us-west-1.compute.internal:6667")
.option("subscribe", "rig_sensor_data")
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
// Load the LogisticRegressionModel
import org.apache.spark.ml.PipelineModel
val LGRModel = PipelineModel.read.load("hdfs://ip-172-31-24-159.us-west-1.compute.internal:8020/models/2018-12-13_15_50_39-LR-Cooling");
//Parsing the data set in JSON and applying transformations on the timestamp
import org.apache.spark.sql.types._
val SensorSchema = (new StructType)
.add("processingtime",LongType)
.add("cycleid",IntegerType)
.add("tstamp",LongType)
.add("psa_bar",FloatType)
.add("psb_bar",FloatType)
.add("psc_bar",FloatType)
.add("psd_bar",FloatType)
.add("pse_bar",FloatType)
.add("psf_bar",FloatType)
.add("motor_power_watt",FloatType)
.add("fsa_vol_flow",FloatType)
.add("fsb_vol_flow",FloatType)
.add("tsa_temp",FloatType)
.add("tsb_temp",FloatType)
.add("tsc_temp",FloatType)
.add("tsd_temp",FloatType)
.add("cool_eff_pct",FloatType)
.add("cool_pwr_kw",FloatType)
.add("eff_fact_pct",FloatType)
val SensorDF = kafkaSensorDF.select(from_json($"value".cast("string"), SensorSchema).as("SensorJSON"))
val SensorDF2 = SensorDF.withColumn("timestamp", from_unixtime(SensorDF.col("SensorJSON.processingtime").divide(1000)).cast(TimestampType))
val SensorDF3 = SensorDF2.select("timestamp","SensorJSON.tsa_temp","SensorJSON.tsb_temp","SensorJSON.tsc_temp","SensorJSON.tsd_temp","SensorJSON.cool_eff_pct","SensorJSON.cool_pwr_kw")
//Calculating average aggregates over 1 minute window
//as the Spark ML model uses 1 minute averages of sensor readings as input
val FormatDF = SensorDF3.withWatermark("timestamp","1 minutes").groupBy(window($"timestamp", "1 minute"))
.agg(
avg("tsa_temp").alias("ts1"),
avg("tsb_temp").alias("ts2"),
avg("tsc_temp").alias("ts3"),
avg("tsd_temp").alias("ts4"),
avg("cool_eff_pct").alias("ce"),
avg("cool_pwr_kw").alias("cp")
).toDF("window","ts1","ts2","ts3","ts4","ce","cp")
//Using the Spark ML pipeline model to score in real-time
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val assembler = new VectorAssembler().setInputCols(Array("ts1", "ts2", "ts3", "ts4", "ce", "cp" ))
val VectorDF = assembler.transform(FormatDF.na.drop)
val predictions = LGRModel.transform(VectorDF).select("ts1", "ts2", "ts3","ts4","ce","cp","prediction")
//Write prediction back to a kafka queue
val SinkQuery = predictions.select(to_json(struct($"ts1",$"ts2",$"ts3",$"ts4",$"ce",$"cp",$"prediction")).alias("value")).writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "ip-172-31-24-159.us-west-1.compute.internal:6667")
.option("checkpointLocation", "checkpoint")
.option("topic","predictive_maint_reading")
.start()
You will have to wait a few minutes to see predictions being computed on the shell. You can check the predictive_maint_reading queue in Kafka to make sure the messages are being written, as described in earlier steps. Processing Notifications in real-time with Apache NiFi In the flow below, we are going to read from the Spark Streaming notification queue and if a reading is flagged, generate an email automatically: You will need to customize a few parameters such as the Kafka broker URIs, the Hortonworks schema registry URI as well as the email parameters. The current example uses a gmail address. I would recommend the stop the PutEmail processor in order to prevent emails sent every minute during a phase where the hydraulic test system data is showing an imminent cooling system failure. The email notification will look like as follows: Ingesting data with Apache Druid For instructions on how to enable the Druid Kafka ingestion service, please check the section 1 in the HCC Article: Druid Kafka Integration Service+Hive. You will need to use the following ingestion spec file. You may need to customize the kafka bootstrap URI: {
"type":"kafka",
"dataSchema":{
"dataSource":"temp_sensor_dsn",
"parser":{
"type":"string",
"parseSpec":{
"format": "json",
"timestampSpec":{
"column":"processingtime",
"format":"auto"
},
"dimensionsSpec":{
"dimensions":[
"cycleid"
],
"dimensionExclusions":[
]
},
"columns":[
"processingtime",
"cycleid",
"tstamp",
"psa_bar",
"psb_bar",
"psc_bar",
"psd_bar",
"pse_bar",
"psf_bar",
"motor_power_watt",
"fsa_vol_flow",
"fsb_vol_flow",
"tsa_temp",
"tsb_temp",
"tsc_temp",
"tsd_temp",
"vs_vib",
"cool_eff_pct",
"cool_pwr_kw",
"eff_fact_pct"
]
}
},
"granularitySpec":{
"type":"uniform",
"segmentGranularity":"fifteen_minute",
"queryGranularity":"minute"
},
"metricsSpec":[
{
"type":"count",
"name":"count"
},
{
"type":"doubleMax",
"fieldName":"tsa_temp",
"name":"tsa_temp_max"
},
{
"type":"doubleMax",
"fieldName":"tsb_temp",
"name":"tsb_temp_max"
},
{
"type":"doubleMax",
"fieldName":"tsc_temp",
"name":"tsc_temp_max"
},
{
"type":"doubleMax",
"fieldName":"tsd_temp",
"name":"tsd_temp_max"
},
{
"type":"doubleMin",
"fieldName":"cool_eff_pct",
"name":"cool_eff_min"
},
{
"type":"doubleMax",
"fieldName":"cool_eff_pct",
"name":"cool_eff_max"
},
{
"type":"doubleMin",
"fieldName":"cool_pwr_kw",
"name":"cool_pwr_min"
},
{
"type":"doubleMax",
"fieldName":"cool_pwr_kw",
"name":"cool_pwr_max"
},
{
"type":"doubleMin",
"fieldName":"eff_fact_pct",
"name":"eff_fact__min"
},
{
"type":"doubleMax",
"fieldName":"eff_fact_pct",
"name":"eff_fact__max"
}
]
},
"tuningConfig":{
"type":"kafka",
"maxRowsPerSegment":50000
},
"ioConfig":{
"topic":"rig_sensor_data",
"consumerProperties":{
"bootstrap.servers":"ip-172-31-24-159.us-west-1.compute.internal:6667"
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT30M"
}
}
In order to submit the ingestion spec file above, you can use the following command for example. You will need to customize the hostname in the URL: curl -X POST -H 'Content-Type: application/json' -d @kafka.json http://ip-172-31-24-159.us-west-1.compute.internal:8095/druid/indexer/v1/supervisor
As a response, you will see something like the following with an ID of your application: {"id":"rig_sensor_data2"} As discussed in the HCC article referenced above, you can check the Druid Overlord console. You should see one running task at least: Once the first task completes after 15-20 minutes, you can proceed to build a real-time visualization with Superset. Data Visualization with Superset Once at least one indexing task has completed, you can login to SuperSet. Once, you click on Scan New Datasources, you should see the temp_sensor_dsn data source appear: You can now build a dashboard. For a step by step instruction on how to build a dashboard, you can follow the example in this lab. This is an example of a near real-time visualization showing us the maximum temperature reading by 4 sensors on the hydraulic rig, with a granularity of 1 minute data point: Conclusion In this article, we went through an end-to-end predictive maintenance use case with the ingestion of IOT data in real-time using Apache MiNiFi, and Apache NiFi. Once the data has been ingested and enriched, it is written to Kafka. From Kafka, the data is consumed for real-time analytics purpose using Spark Structured Streaming to compute aggregates (avg over a minute), and score the data in real-time. We also showcase near real-time visualization of the data using Apache Druid and Superset. Additional areas that could be explored would be to create a Hive table using the Druid cube as storage, and be able to query the data using standard SQL in near real-time. Credits The MQTT Generator was inspired from this Github: https://gist.github.com/marianoguerra/be216a581ef7bc23673f501fdea0e15a HCC Article on Druid Kafka Integration Service: https://community.hortonworks.com/articles/227104/druid-kafka-integration-service-hive.html
... View more
08-18-2017
12:58 PM
@Krishna Shah Thanks for the update. Glad it helped. Can you please accept the answer ?
... View more
08-16-2017
03:56 PM
1 Kudo
@Greg Keys What is your schema access strategy on the service controller for your record writer (AvroRecordSetWriter) ?
... View more
08-16-2017
03:49 PM
1 Kudo
@Krishna Shah Can you paste the whole log? Hard to tell from this excerpt. There might be an earlier error causing this. As well, why are you running this in local mode? Did you try restarting the grunt shell ? I also hit some weird errors which cleared after restarting the shell which was in an inconsistent state.
... View more
08-16-2017
03:13 PM
@uri ben-ari You can probably use a tool like jq to achieve this through a shell script: https://stedolan.github.io/jq/ As to how to use jq, there is available literature on forums such as stackoverflow: https://stackoverflow.com/questions/1955505/parsing-json-with-unix-tools
... View more
07-26-2017
03:29 PM
@Bryan Quinn
You are correct. I missed the fact that the processor hashes attributes together rather than apply an sha hash. My bad. It looks like no processor is available to do this today, but NIFI-2961 has been opened to address this given requirement. The suggested alternative would be to do it through ExecuteScript processor. Please check the following thread for further details: http://apache-nifi-users-list.2361937.n4.nabble.com/Encryption-and-Hashing-in-Nifi-td2307.html Thanks, Zohar
... View more
07-25-2017
10:05 PM
@Bryan Quinn Did you check the HashAttribute processor ? http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.HashAttribute/index.html It looks like it will do what you are looking for. Thanks, Zohar
... View more
07-25-2017
09:58 PM
1 Kudo
@Max Evers Set nifi.flowcontroller.autoResumeState=false in the bootstrap.conf and recycle. If NiFi is managed by Ambari, you will need to modify the config through Ambari ; In the config tab, search for nifi.flowcontroller.autoResumeState in advanced nifi-propertiesand unclick the square.
... View more
05-30-2017
07:23 PM
@yvora Thanks for your answer. Made me look back at the entire flow.
... View more