1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2486 | 04-03-2024 06:39 AM | |
| 3844 | 01-12-2024 08:19 AM | |
| 2080 | 12-07-2023 01:49 PM | |
| 3066 | 08-02-2023 07:30 AM | |
| 4199 | 03-29-2023 01:22 PM |
12-29-2018
05:49 AM
4 Kudos
Implementing Streaming Machine Learning and Deep Learning In Production Part 1 After we have done our data exploration with Apache Zeppelin, Hortonworks Data Analytics Studio and other Data Science Notebooks and Tools, we will start building iterations of ever improving models that need to be used in live environments. These will need to run at scale and score millions of records in real-time streams. These can be in various frameworks, versions,types and many options of data required. There are a number of things we need to think about when doing this. Model Deployment Options Apache Spark Apache Storm (Hortonworks Streaming Analytics Manager - SAM) Apache Kafka Streams Apache NiFi YARN 3.1 YARN Submarine TensorFlow Serving on YARN Cloudera Data Science Workbench Requirements Classification REST API Security Automation Data Lineage Schema Versioning, REST API and Management Data Provenance Scripting Integration with Kafka Containerized Services Support Docker Containers running on YARN Support Dockerized Spark Jobs Model Registry Scalability Data Variety Data and Storage Format Flexiblity Handling Media Types such as images, sound and video Required Elements Apache NiFi 1.8.0 Apache Kafka 2.0 Apache Kafka Streams 2.0 Apache Atlas 1.0.0 Apache Ranger 1.2.0 Apache Knox 1.0 Hortonworks Streams Messaging Manager 1.2.0 Hortonworks Schema Registry 0.5.2 NiFi Registry 0.2.0 Apache Hadoop 3.1 Apache YARN 3.1+ Apache HDFS or Amazon S3 Apache Druid 0.12.1 Apache HBase 2.0 Apache Spark - Apache NiFi There are a number of options for running Machine Learning models in production via Apache NiFi. I have use these methods.
Apache NiFi to Apache
Spark Integration via Kafka and Spark Streaming Apache NiFi to Apache
Spark Integration via Kafka and Spark Structured Streaming Apache NiFi to Apache
Spark Integration via Apache Livy https://community.hortonworks.com/content/kbentry/174105/hdp-264-hdf-31-apache-spark-structured-streaming-i.html https://community.hortonworks.com/articles/174105/hdp-264-hdf-31-apache-spark-structured-streaming-i.html https://community.hortonworks.com/content/kbentry/171787/hdf-31-executing-apache-spark-via-executesparkinte.html Hadoop - YARN 3.1 - No Docker - No Spark We can deploy Deep Learning Models and run classification (as well as training) on YARN natively. https://community.hortonworks.com/content/kbentry/222242/running-apache-mxnet-deep-learning-on-yarn-31-hdp.html https://community.hortonworks.com/articles/224268/running-tensorflow-on-yarn-31-with-or-without-gpu.html Apache Kafka Streams Kafka Streams has full integration
Platform services including Schema Registry, Ranger and Ambari. Apache NiFi Native Java Processors for Classification We can use a custom processor in Java that runs as a native part of the dataflow. https://community.hortonworks.com/content/kbentry/116803/building-a-custom-processor-in-apache-nifi-12-for.html https://github.com/tspannhw/nifi-tensorflow-processor https://community.hortonworks.com/articles/229215/apache-nifi-processor-for-apache-mxnet-ssd-single.html https://github.com/tspannhw/nifi-mxnetinference-processor Apache NiFi Integration with a Model Server Native to a Framework Apache MXNet has an open source model server that has a full REST API that can easily be integrated with Apache NiFi. https://community.hortonworks.com/articles/155435/using-the-new-mxnet-model-server.html https://community.hortonworks.com/articles/223916/posting-images-with-apache-nifi-17-and-a-custom-pr.html https://community.hortonworks.com/articles/177232/apache-deep-learning-101-processing-apache-mxnet-m.html To run Apache MXNet model server is easy: mxnet-model-server --models
SSD=resnet50_ssd_model.model --service ssd_service.py --port 9998 TensorFlow also has a model server that supports gRPC and REST. https://www.tensorflow.org/serving/api_rest Hortonworks Streaming Analytics Manager (SAM) SAM supports running machine learning models exported as PMML as part of a flow. https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/getting-started-with-streaming-analytics/content/export_the_model_into_sam%27s_model_registry.html https://hortonworks.com/blog/part-4-sams-stream-builder-building-complex-stream-analytics-apps-without-code/ You can score the model in a fully graphical manner: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/getting-started-with-streaming-analytics/content/score_the_model_using_the_pmml_processor_and_alert.html Deep Work on Model Governance and Integration with Apache Atlas:
Customizing Atlas (Part1): Model governance, traceability and registry Generalized Framework to Deploy Models and Integrate Apache Atlas for Model Governance Customizing Atlas (Part2): Deep source metadata & embedded entities Customizing Atlas (Part3): Lineage beyond Hadoop, including reports & emails References:
https://conferences.oreilly.com/strata/strata-ny-2018/public/schedule/detail/68140 https://apachecon.dukecon.org/acna/2018/#/scheduledEvent/7058e0d4f5ab28836 https://dataworkssummit.com/berlin-2018/session/iot-with-apache-mxnet-and-apache-nifi-and-minifi/ https://dataworkssummit.com/berlin-2018/session/apache-deep-learning-101/ https://dataworkssummit.com/san-jose-2018/session/open-source-computer-vision-with-tensorflow-apache-minifi-apache-nifi-opencv-apache-tika-and-python-2/ https://www.slideshare.net/bunkertor/apache-deep-learning-201-philly-open-source https://www.slideshare.net/bunkertor/running-apache-nifi-with-apache-spark-integration-options
... View more
Labels:
12-28-2018
05:15 PM
3 Kudos
IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams See Part 1: https://community.hortonworks.com/content/kbentry/229522/iot-series-sensors-utilizing-breakout-garden-hat-p.html In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application. With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application. In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel. In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor. In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor. I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+. Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic. If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7. sudo yum -y install mosquitto
/etc/mosquitto/mosquitto.conf
mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it. In a future version I will use Hortonworks Schema Registry and Avro. I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device. Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py def send_tcp(s, message):
if not message:
try:
s.sendall(message.encode('utf-8')) <br> except:
print("Failed to send message") For testing IOT values, I have a GenerateFlowFile with this JSON: {
"systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",
"BH1745_green" : "${random():mod(100):plus(1)} ",
"ltr559_prox" : "0000",
"end" : "${now():format('yyyyMMddHHmmss')}",
"uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",
"lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
"imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
"cputemp" : ${random():mod(100):toNumber()},
"BH1745_blue" : "9.0",
"te" : "47.3427119255",
"bme680_tempc" : "28.19",
"imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
"bme680_tempf" : "80.${random():mod(100):toNumber()}",
"ltr559_lux" : "006.87",
"memory" : 34.9,
"VL53L1X_distance_in_mm" : 134,
"bme680_humidity" : "${random():mod(100):toNumber()}",
"host" : "vid5",
"diskusage" : "8732.7",
"ipaddress" : "192.168.1.167",
"bme680_pressure" : "1017.31",
"BH1745_clear" : "10.0",
"BH1745_red" : "0.0",
"lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
"starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"
}
Kafka Streams Source Code: https://github.com/tspannhw/kstreams Running the Fat Jar: java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********
Memory Usage: 28284968 Updated Source Code: https://github.com/tspannhw/minifi-breakoutgarden Updated Example Run Output {
"ltr559_lux" : "033.75",
"uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",
"cputemp" : 51,
"host" : "piups",
"lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",
"bme680_tempc" : "24.96",
"score" : "0.9694475",
"lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",
"ltr559_prox" : "0000",
"bme680_humidity" : "28.875",
"diskusage" : "10058.7",
"human_string" : "electric fan, blower",
"bme680_pressure" : "1012.00",
"BH1745_green" : "31.0",
"imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
"systemtime" : "12/28/2018 11:24:11",
"BH1745_red" : "33.0",
"starttime" : "12/28/2018 11:16:02",
"BH1745_blue" : "19.8",
"end" : "1546014251.2879872",
"bme680_tempf" : "76.93",
"VL53L1X_distance_in_mm" : 455,
"te" : "488.33915853500366",
"memory" : 70.8,
"imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
"ipaddress" : "192.168.1.166",
"BH1745_clear" : "40.0"
}
From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack. Temperature warning 82.74 Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease. Updated Schema with TF Attributes {
"type": "record",
"name": "garden",
"fields": [
{
"name": "systemtime",
"type": "string"
},
{
"name": "BH1745_green",
"type": "string"
},
{
"name": "human_string",
"type": "string",
"default": "UNK"
},
{
"name": "ltr559_prox",
"type": "string"
},
{
"name": "end",
"type": "string"
},
{
"name": "uuid",
"type": "string"
},
{
"name": "lsm303d_accelerometer",
"type": "string"
},
{
"name": "score",
"type": "string",
"default": "0"
},
{
"name": "imgnamep",
"type": "string"
},
{
"name": "cputemp",
"type": "double",
"doc": "Type inferred from '58.0'"
},
{
"name": "BH1745_blue",
"type": "string",
"doc": "Type inferred from '\"10.8\"'"
},
{
"name": "te",
"type": "string",
"doc": "Type inferred from '\"254.545491934\"'"
},
{
"name": "bme680_tempc",
"type": "string",
"doc": "Type inferred from '\"29.13\"'"
},
{
"name": "imgname",
"type": "string"
},
{
"name": "bme680_tempf",
"type": "string",
"doc": "Type inferred from '\"84.43\"'"
},
{
"name": "ltr559_lux",
"type": "string",
"doc": "Type inferred from '\"077.95\"'"
},
{
"name": "memory",
"type": "double",
"doc": "Type inferred from '37.6'"
},
{
"name": "VL53L1X_distance_in_mm",
"type": "int",
"doc": "Type inferred from '161'"
},
{
"name": "bme680_humidity",
"type": "string",
"doc": "Type inferred from '\"32.359\"'"
},
{
"name": "host",
"type": "string",
"doc": "Type inferred from '\"vid5\"'"
},
{
"name": "diskusage",
"type": "string",
"doc": "Type inferred from '\"8357.6\"'"
},
{
"name": "ipaddress",
"type": "string",
"doc": "Type inferred from '\"192.168.1.167\"'"
},
{
"name": "bme680_pressure",
"type": "string",
"doc": "Type inferred from '\"987.86\"'"
},
{
"name": "BH1745_clear",
"type": "string",
"doc": "Type inferred from '\"90.0\"'"
},
{
"name": "BH1745_red",
"type": "string",
"doc": "Type inferred from '\"33.0\"'"
},
{
"name": "lsm303d_magnetometer",
"type": "string"
},
{
"name": "starttime",
"type": "string"
}
]
}
HBase table create 'breakout', 'sensors' Example Row 1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8
1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0
1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0
1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0
1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455
1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875
1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00
1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96
1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93
1546014251.2879872 column=sensors:cputemp, timestamp=1546020326955, value=51.0
1546014251.2879872 column=sensors:diskusage, timestamp=1546020326955, value=10058.7
1546014251.2879872 column=sensors:host, timestamp=1546020326955, value=piups
1546014251.2879872 column=sensors:human_string, timestamp=1546020326955, value=electric fan, blower
1546014251.2879872 column=sensors:imgname, timestamp=1546020326955, value=/opt/demo/images/bog_image_201812281
62321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg
1546014251.2879872 column=sensors:imgnamep, timestamp=1546020326955, value=/opt/demo/images/bog_image_p_201812
28162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg
1546014251.2879872 column=sensors:ipaddress, timestamp=1546020326955, value=192.168.1.166
1546014251.2879872 column=sensors:lsm303d_accelerometer, timestamp=1546020326955, value=+00.12g : -01.00g : +0
0.08g
1546014251.2879872 column=sensors:lsm303d_magnetometer, timestamp=1546020326955, value=-00.12 : +00.27 : +00.1
5
1546014251.2879872 column=sensors:ltr559_lux, timestamp=1546020326955, value=033.75
1546014251.2879872 column=sensors:ltr559_prox, timestamp=1546020326955, value=0000
1546014251.2879872 column=sensors:memory, timestamp=1546020326955, value=70.8
1546014251.2879872 column=sensors:score, timestamp=1546020326955, value=0.9694475
1546014251.2879872 column=sensors:starttime, timestamp=1546020326955, value=12/28/2018 11:16:02
1546014251.2879872 column=sensors:systemtime, timestamp=1546020326955, value=12/28/2018 11:24:11
1546014251.2879872 column=sensors:te, timestamp=1546020326955, value=488.33915853500366
1546014251.2879872 column=sensors:uuid, timestamp=1546020326955, value=20181228162321_cbd0cbd3-17f6-4730-ae43-
1e7b46a01135
... View more
Labels:
12-14-2018
06:45 PM
2 Kudos
IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 1 - Introduction An easy option for adding, removing and prototype sensor reads from a standard Raspberry Pi with no special wiring. Hardware Component List: Raspberry Pi USB Power Cable Pimoroni Breakout Garden Hat 1.12" Mono OLED Breakout 128x128 White/Black Screen BME680 Air Quality, Temperature, Pressure, Humidity Sensor LWM303D 6D0F Motion Sensor (X, Y, Z Axes) BH1745 Luminance and Color Sensor LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux VL53L1X Time of Flight (TOF) Sensor Pew Pew Lasers! Software Component List: Raspian Python 2.7 JDK 8 Java Apache NiFi MiniFi Source Code: https://github.com/tspannhw/minifi-breakoutgarden
Shell Script (https://github.com/tspannhw/minifi-breakoutgarden/blob/master/runbrk.sh) Python (https://github.com/tspannhw/minifi-breakoutgarden/blob/master/brk.py) Summary Our Raspberry Pi has a Breakout Garden Hat with 5 sensors and one small display. The display is showing the last reading and is constantly updating. For debugging purposes, it shows the IP Address so I can connect as needed. We currently run via nohup, but when we go into constant use I will switch to a Linux Service to run on startup. The Python script initializes the connections to all of the sensors and then goes into an infinite loop of reading those values and building a JSON packet that we send via TCP/IP over port 5005 to a listener. MiniFi 0.5.0 Java Agent is using ListenTCP on that port to capture these messages and filter them based on alarm values. If outside of the checked parameters we send them via S2S/HTTP(s) to an Apache NiFi server. We also have a USB WebCam (Sony Playstation 3 EYE) that is capturing images and we read those with MiniFi and send them to NiFi as well. The first thing we need to do is pretty easy. We need to plug in our Pimoroni Breakout Garden Hat and our 6 plugs. You have to do the standard installation of Python, Java 8, MiniFi and I recommend OpenCV. Make sure you have everything plugged in securely and the correct direction before you power on the Raspberry Pi. Download MiniFi Java Here: https://nifi.apache.org/minifi/download.html Install Python PIP curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py Install Breakout Garden Library wget https://github.com/pimoroni/breakout-garden/archive/master.zip unzip master.zip cd breakout-garden-master sudo ./install.sh Running In NiFi First we build our MiniFi Flow: We have two objectives: listen for TCP/IP JSON messages from our running Python sensor collector and gather images captured by the PS3 Eye USB Webcam. We then add content type and schema information to the attributes. We also extract a few values from the JSON stream to use for alerting. We extract: $.cputemp, $.VL53L1X_distance_in_mm, $.bme680_humidity, $.bme680_tempf These attributes are now attached to our flowfile which is unchanged. We can now Route on them. So we route on a few alarm conditions: ${cputemp:gt(100)} ${humidity:gt(60)} ${tempf:gt(80)} ${distance:gt(0)} We can easily add more conditions or different set values. We can also populate these set values from an HTTP / file lookup. If these values are met we send to our local Apache NiFi router. This can then do further analysis with the fuller NiFi processor set including TensorFlow, MXNet, Record processing and lookups. Local NiFi Routing For now we are just splitting up the images and JSON and sending to two different remote ports on our cloud NiFi cluster. These then arrive in the cloud. As you can see a list of the flow files waiting to be processed (I haven't written that part yet). As you can see we are getting a few a second, we could get 100,000 a second if we needed. Just add nodes. Instant scaling. Cloudbreak can do that for you. In part 2, we will start processing these data streams and images. We will also add Apache MXNet and TensorFlow at various points on the edge, router and cloud using Python and built-in Deep Learning NiFi processors I have authored. We will also break apart these records and send each sensor to it's own Kafka topic to be processed with Kafka Streams, Druid, Hive and HBase. As part of our loop we write to our little screen current values: Example Record {
"systemtime" : "12/19/2018 22:15:56",
"BH1745_green" : "4.0",
"ltr559_prox" : "0000",
"end" : "1545275756.7",
"uuid" : "20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2",
"lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
"imgnamep" : "images/bog_image_p_20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2.jpg",
"cputemp" : 51.0,
"BH1745_blue" : "9.0",
"te" : "47.3427119255",
"bme680_tempc" : "28.19",
"imgname" : "images/bog_image_20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2.jpg",
"bme680_tempf" : "82.74",
"ltr559_lux" : "006.87",
"memory" : 34.9,
"VL53L1X_distance_in_mm" : 134,
"bme680_humidity" : "23.938",
"host" : "vid5",
"diskusage" : "8732.7",
"ipaddress" : "192.168.1.167",
"bme680_pressure" : "1017.31",
"BH1745_clear" : "10.0",
"BH1745_red" : "0.0",
"lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
"starttime" : "12/19/2018 22:15:09"
}
NiFi Templates
nifi-garden-router.xml minifi-garden.xml garden-server.xml Let's Build Those Topics Now /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bme680
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bh17455
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic lsm303d
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic vl53l1x
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ltr559
Hopefully in your environment, you will be able to have 3, 5 or 7 replication factor and many partitions. I have one Kafka Broker so this is what we are starting with. Reference https://shop.pimoroni.com/collections/breakout-garden https://github.com/pimoroni/breakout-garden/tree/master/examples https://shop.pimoroni.com/products/breakout-garden-hat https://github.com/pimoroni/bme680-python https://learn.pimoroni.com/bme680 https://shop.pimoroni.com/products/bh1745-luminance-and-colour-sensor-breakout https://github.com/pimoroni/bh1745-python https://shop.pimoroni.com/products/vl53l1x-breakout https://github.com/pimoroni/vl53l1x-python/tree/master/examples https://shop.pimoroni.com/products/ltr-559-light-proximity-sensor-breakout https://github.com/pimoroni/ltr559-python https://shop.pimoroni.com/products/bme680-breakout https://github.com/pimoroni/bme680 https://shop.pimoroni.com/products/lsm303d-6dof-motion-sensor-breakout https://github.com/pimoroni/lsm303d-python https://shop.pimoroni.com/products/1-12-oled-breakout https://github.com/rm-hull/luma.oled http://www.diegoacuna.me/how-to-run-a-script-as-a-service-in-raspberry-pi-raspbian-jessie/
... View more
Labels:
12-12-2018
02:31 AM
My Example Mesh Network Argon: Wi-Fi + Mesh Gateway/Repeater This is to connect to the outside network via WiFi. We connect to Particle Cloud. Espressif ESP32-D0WD 2.4G Wi-Fi Nordic Semiconductor nRF52840 SoC for bluetooth and NFC A tag ARM TrustZone CryptoCell-310 Cryptographic and security module Thread/BLE, another for Wi-Fi) Xenon: Mesh + BLE Nordic Semiconductor nRF52840 SoC for bluetooth and NFC A tag ARM TrustZone CryptoCell-310 Cryptographic and security module References
https://github.com/Seeed-Studio/Grove_Starter_Kit_for_Photon_Demos?files=1 https://docs.particle.io/datasheets/accessories/mesh-accessories/ https://community.particle.io/c/mesh https://www.particle.io/mesh https://docs.particle.io/datasheets/mesh/xenon-datasheet/ https://docs.particle.io/datasheets/wi-fi/argon-datasheet/ https://blog.particle.io/2018/04/28/how-to-build-a-wireless-mesh-network/ WiFi Mesh = https://www.threadgroup.org/ https://github.com/Seeed-Studio/Grove_Starter_Kit_for_Photon_Demos/blob/master/Example%20-%2005%20Measuring%20Temperature/Example05.ino http://wiki.seeedstudio.com/Grove-Ultrasonic_Ranger/ https://www.particle.io/mesh/buy/xenon
... View more
Labels:
12-10-2018
07:41 PM
3 Kudos
Deep Speech with Apache NiFi 1.8
Tools: Python 3.6, PyAudio, TensorFlow, Deep Speech, Shell, Apache NiFi
Why: Speech-to-Text
Use Case: Voice control and recognition.
Series: Holiday Use Case: Turn on Holiday Lights and Music on command. Cool Factor: Ever want to run a query on Live Ingested Voice Commands?
Other Options: https://community.hortonworks.com/articles/155519/voice-controlled-data-flows-with-google-aiy-voice.html
We are using Python 3.6 to write some code around pyaudio, tensorflow and Deep Speech to capture audio, store it in a wave file and then process it with Deep Speech to extract some text. This example is running in OSX without a GPU on Tensorflow v1.11.
The Mozilla Github repo for their Deep Speech implementation has nice getting started information that I used to integrate our flow with Apache NiFi.
Installation as per https://github.com/mozilla/DeepSpeech
pip3 install deepspeech
wget -O - https://github.com/mozilla/DeepSpeech/releases/download/v0.3.0/deepspeech-0.3.0-models.tar.gz | tar xvfz -
This pre-trained model is available for English. For other languages, you will need to build your own. You can use a beef HDP 3.1 cluster to train this. Note: THIS IS A 1.8 GIG DOWNLOAD. That may be an issue for laptops, devices or small data people. Apache NiFi Flow The flow is simple, we call our shell script that runs Python that records audio and sends it to Deep Speech for processing. We get back a voice_string in JSON that we turn into a record for querying and filtering in Apache NiFi. I am handling a few voice commands for "Save", "Load" and "Move". As you can imagine you can handle pretty much anything you want. It's a simple way to use voice to control streaming data flows or just to ingest large streams of text. Even using advanced Deep Learning, text recognition is still not the strongest. If you are going to load balance connections between nodes, you have options on compression and load balancing strategies. This can come in handy if you have a lot of servers. Shell Script
python3.6 /Volumes/TSPANN/projects/DeepSpeech/processnifi.py /Volumes/TSPANN/projects/DeepSpeech/models/output_graph.pbmm /Volumes/TSPANN/projects/DeepSpeech/models/alphabet.txt
Schema
{
"type" : "record",
"name" : "voice",
"fields" : [ {
"name" : "systemtime",
"type" : "string",
"doc" : "Type inferred from '\"12/10/2018 14:53:47\"'"
}, {
"name" : "voice_string",
"type" : "string",
"doc" : "Type inferred from '\"\"'"
} ]
}
We can add more fields as needed.
Example Run
HW13125:DeepSpeech tspann$ ./runnifi.sh
TensorFlow: v1.11.0-9-g97d851f04e
DeepSpeech: unknown
2018-12-10 14:36:43.714433: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
{"systemtime": "12/10/2018 14:36:43", "voice_string": "one two three or five six seven eight nine"}
We can run this on top of YARN 3.1 as dockerized or non-dockerized workloads. Setting up nodes to run HDF 3.3 - Apache NiFi and friends is easy in the cloud or on-premise in OpenStack with super devops tools. When running Apache NiFi it is easy to monitor in Ambari:
References:
https://github.com/mozilla/DeepSpeech
https://community.hortonworks.com/articles/224268/running-tensorflow-on-yarn-31-with-or-without-gpu.html
https://arxiv.org/abs/1412.5567
https://github.com/tspannhw/nifi-deepspeech
... View more
Labels:
12-07-2018
04:59 PM
Thanks for the update!
... View more
12-06-2018
09:53 PM
5 Kudos
Apache NiFi Processor for Apache MXNet SSD: Single Shot MultiBox Object Detector (Deep Learning) The news is out, Apache MXNet has added a Java API. So as soon as I could I got my hands on the maven repo and an example program and got to work writing a new Apache NiFi processor for it. I have run this on standalone Apache NiFi 1.8.0 and on HDF 3.3 - Apache NiFi 1.8.0 and both work. So anyone who wants to be an alpha tester, please download it and give it a try. Apache MXNet SSD is a good example of a pretrained deep learning model that works pretty well for general images in a use cases especially around people and cars. You can fine-tune this with some more images and runs: https://mxnet.incubator.apache.org/faq/finetune.html The nice thing is now we can start including Apache MXNet as part of Java applications such as Kafka Streams, Apache Storm, Apache Spark, Spring Boot and other use cases using Java. I could potentially inject this into a Hive UDF (https://community.hortonworks.com/articles/39980/creating-a-hive-udf-in-java.html#comment-40026) or Pig UDF. The performance may be fast enough. We now have four Java options for Deep Learning: DL4J, H2O, Tensorflow and Apache MXNet. Unfortunately, both TensorFlow and MXNet Java APIs are not quite production ready. I may do some further research on running MXNet as a Hive UDF, it would be cool to have in a query. For those who don't want to setup a development environment with JDK 8+, Maven 3.3+ and git, you can download a pre-built nar file here: https://github.com/tspannhw/nifi-mxnetinference-processor/releases/tag/v1.0. As part of the recent release of HDF 3.3, I have upgraded my OpenStack Centos 7 cluster. Important Caveats Notice, the Java API is in preview and so is this processor. Do not use this in production! This is in development and I am the only one working on it. The Java API from Apache MXNet is in flux and will be changing. See the POM as it is tied to the OSX/Mac version of the library. You will need to change that. You will need to download the pre-built MXNet model and place it in a directory accessible to Apache NiFi server/cluster. I am still cleaning up the rectangle code for identifying objects in the pictures. As you will notice, my rectangle drawing is a bit off. I need to work on that. Once you drop your built nar file and models in the nifi/lib directory and restart Apache NiFi, you can add it to your canvas. We need to feed it some images. You can use my web cam processor, an image URL feed or local files. To grab images from an HTTPS site, you need an SSL Context Service like this StandardSSLContextService below. You will need to point to the cacerts used by the JRE/JDK running your Apache NiFi node. The default password in Java is changeme. Hopefully you have changed it. To configure my new processor, just put in the full path to the model directory and then "/resnet50_ssd_model" as that is the prefix for the model. Our example flow with new processor being fed by traffic cameras, webcams, local files and local webcam. Some output of our flow: Our top 5 probabilities and labels Example Data: {
"ymin_1" : "456.01",
"ymin_5" : "159.29",
"ymin_4" : "235.83",
"ymin_3" : "206.64",
"ymin_2" : "383.84",
"label_5" : "person",
"xmax_5" : "121.14",
"label_4" : "bicycle",
"xmax_4" : "137.89",
"label_3" : "dog",
"xmax_3" : "179.14",
"ymax_1" : "150.66",
"ymax_2" : "418.95",
"ymax_3" : "476.79",
"label_2" : "bicycle",
"label_1" : "car",
"probability_4" : "0.22",
"probability_5" : "0.13",
"probability_2" : "0.90",
"xmin_5" : "88.93",
"probability_3" : "0.82",
"ymax_4" : "413.43",
"probability_1" : "1.00",
"ymax_5" : "190.04",
"xmax_2" : "149.96",
"xmax_1" : "72.03",
"xmin_3" : "83.82",
"xmin_4" : "93.05",
"xmin_1" : "312.21",
"xmin_2" : "155.96"
} Resources: https://medium.com/apache-mxnet/introducing-java-apis-for-deep-learning-inference-with-apache-mxnet-8406a698fa5a https://github.com/apache/incubator-mxnet/tree/java-api/scala-package/examples/src/main/java/org/apache/mxnetexamples/javaapi https://mxnet.incubator.apache.org/install/java_setup.html Source: https://github.com/tspannhw/nifi-mxnetinference-processor Video walk-through: https://www.youtube.com/watch?v=Q4dSGPvqXSA&t=196s&list=PL-7XqvSmQqfTSihuoIP_ZAnN7mFIHkZ_e&index=17 mxnet-processor.xml Download the artifacts listed: https://github.com/apache/incubator-mxnet/tree/java-api/scala-package/examples/src/main/java/org/apache/mxnetexamples/javaapi/infer/objectdetector#step-1 Maven POM (I used Java 8 and Maven 3.3.9) <?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.dataflowdeveloper.mxnet</groupId>
<artifactId>inference</artifactId>
<version>1.0</version>
</parent>
<artifactId>nifi-mxnetinference-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mxnet</groupId>
<artifactId>mxnet-full_2.11-osx-x86_64-cpu</artifactId>
<version>1.3.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
I have moved from Eclipse to IntelliJ from my builds. I am looking at Apache Netbeans as well.
... View more
Labels:
11-27-2018
03:50 AM
5 Kudos
MiniFi Java Agent 0.5 Copy over necessary NARs from Apache NiFi 1.7 lib:
nifi-ssl-context-service-nar-1.7.0.nar nifi-standard-services-api-nar-1.7.0.nar nifi-kafka-1-0-nar-1.7.0.nar This will support PublishKafka_1_0 and ConsumeKafka_1_0. Then create a consume and/or publish flow. You can combine the two based on your needs. In my simple example I consume the Kafka messages in MiniFi and write to a file. I also write the metadata to a JSON file. Consume Kafka Publish Electric Monitoring Data To Kafka Let's monitor the messages going through our topic, smartPlug. Publish Messages to Kafka Consume Any Messages From the smartPlug topic Logs Provenance Event file containing 377 records. In the past 5 minutes, 1512 events have been written to the Provenance Repository, totaling 839.32 KB
2018-11-26 19:42:32,473 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting PutFile[id=25a86505-031a-37d9-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.apache.nifi.controller.FlowController Started 0 Remote Group Ports transmitting2018-11-26 19:42:32,478 INFO [main] org.apache.nifi.minifi.MiNiFiServer Flow loaded successfully.2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled ConsumeKafka_1_0[id=8556f1ce-a915-3fda-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.BootstrapListener Successfully initiated communication with Bootstrap2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled AttributesToJSON[id=0628b4e5-10d0-3b09-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.minifi.MiNiFi Controller initialization took 2787584123 nanoseconds.2018-11-26 19:42:32,480 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled PutFile[id=25a86505-031a-37d9-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,481 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,585 INFO [Timer-Driven Process Thread-2] o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values:auto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [princeton1.field.hortonworks.com:6667]check.crcs = trueclient.id =connections.max.idle.ms = 540000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = minificonsumer1heartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 10000metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer2018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka version : 1.0.02018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka commitId : aaa7af6d4a11b29d2018-11-26 19:42:33,088 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Discovered coordinator princeton1.field.hortonworks.com:6667 (id: 2147482646 rack: null)2018-11-26 19:42:33,090 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Revoking previously assigned partitions []2018-11-26 19:42:33,091 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] (Re-)joining group2018-11-26 19:42:36,391 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Successfully joined group with generation 32018-11-26 19:42:36,394 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Setting newly assigned partitions [smartPlug-0]2018-11-26 19:44:32,325 INFO [pool-34-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 0 milliseconds2018-11-26 19:44:40,700 INFO [Provenance Maintenance Thread-1] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 14372018-11-26 19:44:40,765 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for provenance_repository/index-1543271506000 has been returned to Index Manager and is no longer in use. Closing Index Writer2018-11-26 19:44:40,767 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (28 records) into single Provenance Log File provenance_repository/1409.prov in 62 milliseconds2018-11-26 19:44:40,768 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 151 records. In the past 5 minutes, 28 events have been written to the Provenance Repository, totaling 15.43 KB JSON Kafka Message and JSON Kafka Metadata Stored As Files monitor/1448678223641638.attr.json {"path":"./","filename":"1448678223641638","kafka.partition":"0","kafka.offset":"5543","kafka.topic":"smartPlug","kafka.key":"cb90ad21-b311-494c-96cc-06dd2e8747df","uuid":"041459fc-c63e-4056-ab50-1c375cd7d49f"} monitor/1448678223641638 {"day30": 0.431, "day31": 1.15, "sw_ver": "1.2.5 Build 171206 Rel.085954", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "00000000000000000000000000000000", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim", "icon_hash": "", "relay_state": 1, "on_time": 886569, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -75, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month10": 1.581, "month11": 30.888, "current": 0.067041, "voltage": 122.151701, "power": 1.277361, "total": 24.289, "time": "11/26/2018 21:54:22", "ledon": true, "systemtime": "11/26/2018 21:54:22"} Resources:
https://blog.ona.io/general/2017/08/30/streaming-ona-data-with-nifi-kafka-druid-and-superset.html https://community.hortonworks.com/articles/193945/social-media-monitoring-with-nifi-hivedruid-integr.html https://community.hortonworks.com/articles/177561/streaming-tweets-with-nifi-kafka-tranquility-druid.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/kafka-using-kafka-streams/content/kafka-using-kafka-streams.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/minifi-quick-start/content/overview.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/minifi-quick-start/content/before_you_begin.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/minifi-quick-start/content/installing_minifi_on_linux.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/minifi-quick-start/content/using_processors_not_packaged_with_minifi.html?es_p=8055369 https://community.hortonworks.com/articles/227560/real-time-stock-processing-with-apache-nifi-and-ap.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.0/minifi-quick-start/content/using_processors_not_packaged_with_minifi.html?es_p=8055369 Files: consumekafka2.xml pushkafka1.xml configyml consume.txt configymlsend.txt
... View more
Labels:
11-15-2018
11:42 PM
6 Kudos
Implementing Streaming Use Case From REST to Hive with Apache NiFi and Apache Kafka Part 1 With Apache Kafka 2.0, Apache NiFi 1.8 and many new features and abilities coming out. It's time to put them to the test. So to plan out what we are going to do, I have a high level architecture diagram. We are going to ingest a number of sources including REST feeds, Social Feeds, Messages, Images, Documents and Relational Data. We will ingest with NiFi, filter and process and segment it into Kafka topics. Kafka data will be in Apache Avro format with schemas specified in Hortonworks Schema Registry. Kafka Streams, Spark and NiFi will do additional event processing along with machine learning and deep learning. it will be stored in Druid for real-time analytics and summaries. Hive, HDFS and S3 will store for permanent storage. We will do dashboards with Superset and Spark SQL + Zeppelin. We will integrate machine learning with Spark ML, TensorFlow and Apache MXNet. We will also push back cleaned and aggregated data to subscribers via Kafka and NiFi. We will push to Dockerized applications, message listeners, web clients, Slack channels and to email mailing lists. To be useful in our enterprise, we will have full authorization, authentication, auditing, data encryption and data lineage via Apache Ranger, Apache Atlas and Apache NiFi. NiFi Registry and github will be used for source code control. We will have administration capabilities via Apache Ambari. An example server layout: NiFi Flows Real-time free stock data is available from IEX with no license key. The data streams in very fast, thankfully that's no issue for Apache NiFi and Kafka. Consume the Different Records from topics and store to HDFS in separate directories and tables. Let's split up one big REST file into individual records of interest. Our REST feed has quote, chart and news arrays. Let's Push Some Messages to Slack We can easily consume from multiple topics in Apache NiFi. Querying data is easy as it's in motion, since we have schemas We create schemas for each of our Kafka Topics We can monitor all these messages going through Kafka in Ambari (and also in much better detail in Hortonworks SMM). I read in data and then can push it to Kafka 1.0 and 2.0 brokers. Once data is sent, NiFi let's us know. Projects Used Apache Kafka Apache Kafka Streams Apache MXNet NLTK Stanford CoreNLP Apache OpenNLP TextBlob SpaCy Apache NiFi Apache Druid Apache Hive on Kafka Apache Hive on Druid Apache Hive on JDBC Apache Zeppelin NLP - Apache OpenNLP and Stanford CoreNLP Hortonworks Schema Registry NiFi Registry Apache Ambari Log Search Hortonworks SMM Hortonworks Data Plane Services (DPS) Sources REST Twitter JDBC Sensors MQTT Documents Sinks Apache Hadoop HDFS Apache Kafka Apache Hive Slack S3 Apache Druid Apache HBase Topics iextradingnews iextradingquote iextradingchart stocks cyber HDFS Directories hdfs dfs -mkdir -p /iextradingnews
hdfs dfs -mkdir -p /iextradingquote
hdfs dfs -mkdir -p /iextradingchart
hdfs dfs -mkdir -p /stocks
hdfs dfs -mkdir -p /cyber
hdfs dfs -chmod -R 777 / PutHDFS
/${kafka.topic} /iextradingchart/859496561256574.orc /iextradingnews/855935960267509.orc /iextradingquote/859143934804532.orc Hive Tables CREATE EXTERNAL TABLE IF NOT EXISTS iextradingchart (`date` STRING, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume INT, unadjustedVolume INT, change DOUBLE, changePercent DOUBLE, vwap DOUBLE, label STRING, changeOverTime INT)
STORED AS ORC
LOCATION '/iextradingchart';
CREATE EXTERNAL TABLE IF NOT EXISTS iextradingquote (symbol STRING, companyName STRING, primaryExchange STRING, sector STRING, calculationPrice STRING, open DOUBLE, openTime BIGINT, close DOUBLE, closeTime BIGINT, high DOUBLE, low DOUBLE, latestPrice DOUBLE, latestSource STRING, latestTime STRING, latestUpdate BIGINT, latestVolume INT, iexRealtimePrice DOUBLE, iexRealtimeSize INT, iexLastUpdated BIGINT, delayedPrice DOUBLE, delayedPriceTime BIGINT, extendedPrice DOUBLE, extendedChange DOUBLE, extendedChangePercent DOUBLE, extendedPriceTime BIGINT, previousClose DOUBLE, change DOUBLE, changePercent DOUBLE, iexMarketPercent DOUBLE, iexVolume INT, avgTotalVolume INT, iexBidPrice INT, iexBidSize INT, iexAskPrice INT, iexAskSize INT, marketCap INT, peRatio DOUBLE, week52High DOUBLE, week52Low DOUBLE, ytdChange DOUBLE)
STORED AS ORC
LOCATION '/iextradingquote';
CREATE EXTERNAL TABLE IF NOT EXISTS iextradingnews (`datetime` STRING, headline STRING, source STRING, url STRING, summary STRING, related STRING, image STRING)
STORED AS ORC
LOCATION '/iextradingnews'; Schemas { "type": "record", "name": "iextradingchart", "fields": [ { "name": "date", "type": [ "string", "null" ] }, { "name": "open", "type": [ "double", "null" ] }, { "name": "high", "type": [ "double", "null" ] }, { "name": "low", "type": [ "double", "null" ] }, { "name": "close", "type": [ "double", "null" ] }, { "name": "volume", "type": [ "int", "null" ] }, { "name": "unadjustedVolume", "type": [ "int", "null" ] }, { "name": "change", "type": [ "double", "null" ] }, { "name": "changePercent", "type": [ "double", "null" ] }, { "name": "vwap", "type": [ "double", "null" ] }, { "name": "label", "type": [ "string", "null" ] }, { "name": "changeOverTime", "type": [ "int", "null" ] } ]}{ "type": "record", "name": "iextradingquote", "fields": [ { "name": "symbol", "type": [ "string", "null" ], "doc": "Type inferred from '\"HDP\"'" }, { "name": "companyName", "type": [ "string", "null" ], "doc": "Type inferred from '\"Hortonworks Inc.\"'" }, { "name": "primaryExchange", "type": [ "string", "null" ], "doc": "Type inferred from '\"Nasdaq Global Select\"'" }, { "name": "sector", "type": [ "string", "null" ], "doc": "Type inferred from '\"Technology\"'" }, { "name": "calculationPrice", "type": [ "string", "null" ], "doc": "Type inferred from '\"close\"'" }, { "name": "open", "type": [ "double", "null" ], "doc": "Type inferred from '16.3'" }, { "name": "openTime", "type": [ "long", "null" ], "doc": "Type inferred from '1542033000568'" }, { "name": "close", "type": [ "double", "null" ], "doc": "Type inferred from '15.76'" }, { "name": "closeTime", "type": [ "long", "null" ], "doc": "Type inferred from '1542056400520'" }, { "name": "high", "type": [ "double", "null" ], "doc": "Type inferred from '16.37'" }, { "name": "low", "type": [ "double", "null" ], "doc": "Type inferred from '15.2'" }, { "name": "latestPrice", "type": [ "double", "null" ], "doc": "Type inferred from '15.76'" }, { "name": "latestSource", "type": [ "string", "null" ], "doc": "Type inferred from '\"Close\"'" }, { "name": "latestTime", "type": [ "string", "null" ], "doc": "Type inferred from '\"November 12, 2018\"'" }, { "name": "latestUpdate", "type": [ "long", "null" ], "doc": "Type inferred from '1542056400520'" }, { "name": "latestVolume", "type": [ "int", "null" ], "doc": "Type inferred from '4012339'" }, { "name": "iexRealtimePrice", "type": [ "double", "null" ], "doc": "Type inferred from '15.74'" }, { "name": "iexRealtimeSize", "type": [ "int", "null" ], "doc": "Type inferred from '43'" }, { "name": "iexLastUpdated", "type": [ "long", "null" ], "doc": "Type inferred from '1542056397411'" }, { "name": "delayedPrice", "type": [ "double", "null" ], "doc": "Type inferred from '15.76'" }, { "name": "delayedPriceTime", "type": [ "long", "null" ], "doc": "Type inferred from '1542056400520'" }, { "name": "extendedPrice", "type": [ "double", "null" ], "doc": "Type inferred from '15.85'" }, { "name": "extendedChange", "type": [ "double", "null" ], "doc": "Type inferred from '0.09'" }, { "name": "extendedChangePercent", "type": [ "double", "null" ], "doc": "Type inferred from '0.00571'" }, { "name": "extendedPriceTime", "type": [ "long", "null" ], "doc": "Type inferred from '1542059622726'" }, { "name": "previousClose", "type": [ "double", "null" ], "doc": "Type inferred from '16.24'" }, { "name": "change", "type": [ "double", "null" ], "doc": "Type inferred from '-0.48'" }, { "name": "changePercent", "type": [ "double", "null" ], "doc": "Type inferred from '-0.02956'" }, { "name": "iexMarketPercent", "type": [ "double", "null" ], "doc": "Type inferred from '0.03258'" }, { "name": "iexVolume", "type": [ "int", "null" ], "doc": "Type inferred from '130722'" }, { "name": "avgTotalVolume", "type": [ "int", "null" ], "doc": "Type inferred from '2042809'" }, { "name": "iexBidPrice", "type": [ "int", "null" ], "doc": "Type inferred from '0'" }, { "name": "iexBidSize", "type": [ "int", "null" ], "doc": "Type inferred from '0'" }, { "name": "iexAskPrice", "type": [ "int", "null" ], "doc": "Type inferred from '0'" }, { "name": "iexAskSize", "type": [ "int", "null" ], "doc": "Type inferred from '0'" }, { "name": "marketCap", "type": [ "int", "null" ], "doc": "Type inferred from '1317308142'" }, { "name": "peRatio", "type": [ "double", "null" ], "doc": "Type inferred from '-7.43'" }, { "name": "week52High", "type": [ "double", "null" ], "doc": "Type inferred from '26.22'" }, { "name": "week52Low", "type": [ "double", "null" ], "doc": "Type inferred from '15.2'" }, { "name": "ytdChange", "type": [ "double", "null" ], "doc": "Type inferred from '-0.25696247383444343'" } ]}{ "type" : "record", "name" : "iextradingchart", "fields" : [ { "name" : "date", "type" : ["string","null"] }, { "name" : "open", "type" : ["double","null"] }, { "name" : "high", "type" : ["double","null"] }, { "name" : "low", "type" : ["double","null"] }, { "name" : "close", "type" : ["double","null"] }, { "name" : "volume", "type" : ["int","null"] }, { "name" : "unadjustedVolume", "type" : ["int","null"] }, { "name" : "change", "type" : ["double","null"] }, { "name" : "changePercent", "type" : ["double","null"] }, { "name" : "vwap", "type" : ["double","null"] }, { "name" : "label", "type" : ["string","null"] }, { "name" : "changeOverTime", "type" : ["int","null"] } ] } Messages to Slack File: ${'filename'} Offset: ${'kafka.offset'} Partition: ${'kafka.partition'} Topic: ${'kafka.topic'} UUID: ${'uuid'} Record Count: ${'record.count'} File Size: ${fileSize:divide(1024)}K See jsonpath.com Splits $.*.quote $.*.chart $.*.news Array to Single $.* GETHTTP URL https://api.iextrading.com/1.0/stock/market/batch?symbols=hdp&types=quote,news,chart&range=1y&last=25000 FileName marketbatch.hdp.${'hdp':append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})} Data provided for free by IEX. View IEX’s Terms of Use. IEX Real-Time Price https://iextrading.com/developer/ Queries SELECT * FROM FLOWFILE WHERE latestPrice > week52Low SELECT * FROM FLOWFILE WHERE latestPrice <= week52Low Example Output File: 855957937589894 Offset: 22460 Partition: 0 Topic: iextradingquote UUID: b2a8e797-2249-4689-9a78-4339ddb5ecb4 Record Count: File Size: 3K Data Visualization in Apache Zeppelin with Hive and Spark SQL Creating tables on top of Apache ORC files in HDFS is easy. Push Some Messages to Slack Resources https://phoenix.apache.org/hive_storage_handler.html https://github.com/aol/druid/tree/master/docs/_graphics Other Data Sources https://www.kaggle.com/qks1lver/amex-nyse-nasdaq-stock-histories https://github.com/qks1lver/redtide Source https://github.com/tspannhw/stocks-nifi-kafka stocks-copy.jsonstock-to-kafka.xml
... View more
Labels:
11-15-2018
03:58 PM
Tested with HDF 3.2 as well.
... View more