1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2470 | 04-03-2024 06:39 AM | |
| 3819 | 01-12-2024 08:19 AM | |
| 2064 | 12-07-2023 01:49 PM | |
| 3045 | 08-02-2023 07:30 AM | |
| 4182 | 03-29-2023 01:22 PM |
11-18-2019
08:29 AM
1 Kudo
Exploring Apache NiFi 1.10: Parameters and Stateless Engine
Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993
You can now use JDK 8 or JDK 11! I am running in JDK 11, seems a bit faster.
A huge feature is the addition of Parameters! And you can use these to pass parameters to Apache NiFi Stateless!
A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance
Release Notes: https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0
Example Source Code: https://github.com/tspannhw/stateless-examples
More New Features:
ParquetReader/Writer (See: https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)
Prometheus Reporting Task. Expect more Prometheus stuff coming.
Experimental Encrypted content repository. People asked me for this one before.
Parameters!! Time to replace Variables/Variable Registry. Parameters are better in every way.
Toolkit module to generate and build Swagger API library for NiFi
PostSlack processor
PublishKafka Partition Support
GeoEnrichIPRecord Processor
Remote Input Port in a Process Group
Command Line Diagnostics
RocksDB FlowFile Repository
PutBigQueryStreaming Processor
nifi.analytics.predict.enabled - Turn on Back Pressure Prediction
More Lookup Services for ETL/ELT: DatabaseRecordLookupService
KuduLookupService
HBase_2_ListLookupService
Stateless
First we will run in the command line straight from the NiFi Registry. This is easiest. Then we will run from YARN! Yes you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters! Let's make use of your hundreds of Hadoop nodes.
Stateless Examples
Let's Build A Stateless Flow
The first thing to keep in mind, is we will want anything that might change to be a parameter that we can pass with our JSON file. It's very easy to set parameters even for drop downs! You even get prompted to pick a parameter from a selection list. Before parameters are available you will need to add them to a parameter list and assign that parameter context to your Process Group.
A Parameter in a Processor Configuration is shown as #{broker}
Parameter Context Connected to a Process Group, Controller Service, ...
Apply those parameters
Param(eter) is now an option for properties
Pop-up Hint for Using Parameters
Edit a Parameter in a Parameter Context
We can configure parameters in Controller Services as well.
So easy to choose an existing one.
Use them for anything that can change or is a something you don't want to hardcode.
Apache Kafka Consumer to Sink
This is a simple two step Apache NiFi flow the reads from Kafka and sends to a sink, for example a File.
Let's make sure we use that Parameter Context
To Build Your JSON Configuration File you will need the bucket ID and flow ID from your Apache NiFi Registry. You will also need the URL for that registry. You can browse that registry at a URL similar to http://tspann-xx-xx:18080.
My Command Line Runner
/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json
RunFromRegistry [Once|Continuous] --file <File Name>
This is the basic use case of running from the command-line using a file. The flow must exist in the reference Apache NiFi Registry.
JSON Configuration File (kafkaconsumer.json)
{
"registryUrl": "http://tspann-xx-xx:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "x.x.x.x:9092",
"topic" : "iot",
"group_id" : "nifi-stateless-kafka-consumer",
"DestinationDirectory" : "/tmp/nifistateless/output2/",
"output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"
}
}
Example Run
12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s)
12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
Example Output
cat output/247361879273711.statelessFlowFile
{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-xx-xx","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}
We can also run Once in this example to send one Kafka message.
Generator to Apache Kafka Producer
My Command Line Runner
/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafka.json
JSON Configuration File (kafka.json)
{
"registryUrl": "http://tspann-xx-xx:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",
"flowVersion": "1",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "x.x.x.x.x:9092"
}
}
Example Output
12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG
Flow Succeeded
Other Runtime Options:
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
RunOpenwhiskActionServer <Port>
References:
Awesome Article on NiFi 1.10 Error Handling: https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10-series-simplifying-error-handling-7de86f130acd
https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html
Parameters Added to API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
http://bit.ly/cdf-platform
https://www.mtnfog.com/blog/apache-nifi-phi-processing
https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements
https://nifi.apache.org/registry
Add A S2S Port Inside Process Group
ParquetReader
ParquetRecordSetWriter
© 2019 Timothy Spann
... View more
Labels:
10-22-2019
03:23 AM
Edge Data Processing with Jetson Nano Part 3 - AI Integration
Source: https://github.com/tspannhw/minifi-jetson-nano
Part 1: https://www.datainmotion.dev/2019/07/edge-data-processing-with-jetson-nano.html
Part 2: https://www.datainmotion.dev/2019/07/edge-processing-with-jetson-nano-part-2.html
Cloudera Edge Management
Top Level NiFi Flow Receiving MiNiFi Agent Messages
Overview of our Apache NiFi Flow For Processing
We format a new flow file to send to CDSW in JSON to the CDSW Job Environment
We Run Apache MXNet 1.3.1 (Java) SSD Against the Web Camera Image
Extract The Values From the FlowFile to Send to the Spark Job
Our JSON Results From the Logs
Log data has successfully arrived, consistent JSON rows are grabbed as they are written to the file
We can see the results of the Spark Job in Cloudera Data Science Workbench (CDSW)
We can also see messages that we sent to slack
... View more
Labels:
10-04-2019
03:12 PM
Source Code: https://github.com/tspannhw/flume-to-nifi Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu Consume Kafka Flow Merge Records And Store As AVRO or ORC Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu Source: Apache Kafka Topics You enter a few parameters and start ingesting data with or without schemas. Apache Flume had no Schema support. Flume did not support transactions. Sink: Files Storing to files in files systems, object stores, SFTP or elsewhere could not be easier. Choose S3, Local File System, SFTP, HDFS or wherever. Sink: Apache Kudu / Apache Impala Storing to Kudu/Impala (or Parquet for that manner could not be easier with Apache NiFi). Sink: HDFS for Apache ORC Files When completes, the ConvertAvroToORC and PutHDFS build the Hive DDL for you! You can build the tables automagically with Apache NiFi if you wish. CREATE EXTERNAL TABLE IF NOT EXISTS iotsensors (sensor_id BIGINT, sensor_ts BIGINT, is_healthy STRING, response STRING, sensor_0 BIGINT, sensor_1 BIGINT, sensor_2 BIGINT, sensor_3 BIGINT, sensor_4 BIGINT, sensor_5 BIGINT, sensor_6 BIGINT, sensor_7 BIGINT, sensor_8 BIGINT, sensor_9 BIGINT, sensor_10 BIGINT, sensor_11 BIGINT) STORED AS ORC LOCATION '/tmp/iotsensors' Sink: Kafka Publishing to Kafka is just as easy! Push records with schema references or raw data. AVRO or JSON, whatever makes sense for your enterprise. Write to data easily with no coding and no changes or redeploys for schema or schema version changes. Pick a Topic and Stream Data While Converting Types Clean UI and REST API to Manage, Monitor, Configure and Notify on Kafka Other Reasons to Use Apache NiFi Over Apache Flume DevOps with REST API, CLI, Python API https://community.cloudera.com/t5/Community-Articles/More-DevOps-for-HDF-Apache-NiFi-Registry-and-Friends/ta-p/248668 Schemas! We not only work with semi-structured, structured and unstructured data. We are schema and schema version aware for CSV, JSON, AVRO, XML, Grokked Text Files and more. https://community.cloudera.com/t5/Community-Articles/Big-Data-DevOps-Apache-NiFi-HWX-Schema-Registry-Schema/ta-p/247963 Flume Replacement Use Cases Implemented in Apache NiFi Sink/Source: JMS https://community.cloudera.com/t5/Community-Articles/Publishing-and-Consuming-JMS-Messages-from-Tibco-Enterprise/ta-p/248157 Source: Files/PDF/PowerPoint/Excel/Word Sink: Files https://community.cloudera.com/t5/Community-Articles/Parsing-Any-Document-with-Apache-NiFi-1-5-with-Apache-Tika/ta-p/247672 https://community.cloudera.com/t5/Community-Articles/Converting-PowerPoint-Presentations-into-French-from-English/ta-p/248974 https://community.cloudera.com/t5/Community-Articles/Creating-HTML-from-PDF-Excel-and-Word-Documents-using-Apache/ta-p/247968 Source: Files/CSV Sink: HDFS/Hive/Apache ORC https://community.cloudera.com/t5/Community-Articles/Converting-CSV-Files-to-Apache-Hive-Tables-with-Apache-ORC/ta-p/248258 Source: REST/Files/Simulator Sink: HBase, Files, HDFS. ETL with Lookups. https://community.cloudera.com/t5/Community-Articles/ETL-With-Lookups-with-Apache-HBase-and-Apache-NiFi/ta-p/248243 Flume Replacement - Lightweight Open Source Agents If you need to replace local Log to Kafka agents or anything to Kafka or anything to anything with routing, transformation and manipulation. You can use Edge Flow Manager deployed MiNiFi Agents available in Java and C++ versions. References https://www.progress.com/tutorials/jdbc/ingest-salesforce-data-incrementally-into-hive-using-apache-nifi https://community.cloudera.com/t5/Community-Articles/RDBMS-to-Hive-using-NiFi-small-medium-tables/ta-p/244677 https://community.cloudera.com/t5/Community-Articles/My-Year-in-Review-2018/ta-p/249363 https://community.cloudera.com/t5/Community-Articles/My-Year-in-Review-2017/ta-p/247541
... View more
03-09-2019
09:01 PM
Using Raspberry Pi 3B+ with Apache NiFi MiNiFi and Google Coral Accelerator and Pimoroni Inky Phat First we need to unbox our new goodies. The Inky Phat is an awesome E-Ink display with low power usage that stays displayed after shutdown! Next I added a new Google Coral Edge TPU ML Accelerator USB Coprocessor to a new Raspberry Pi 3B+. This was so easy to integrate and get up and running. Let's unbox this beautiful device (but be careful when it runs it can get really hot and there is a warning in the instructions). So I run this on top of an aluminum case and with a big fan on it. Pimoroni Inky Phat It is pretty easy to set this up and it provides a robust Python library to write to our E-Ink display. You can see an example screen here. https://github.com/pimoroni/inky Pimoroni Inky pHAT ePaper eInk Display in Red Pimoroni Inky Phat (Red) https://shop.pimoroni.com/products/inky-phat https://github.com/pimoroni/inky https://pillow.readthedocs.io/en/stable/reference/ImageDraw.html https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-inky-phat Install Some Python Libraries and Debian Install for Inky PHAT and Coral pip3 install font_fredoka_one pip3 install geocoder pip3 install fswebcam sudo apt-get install fe pip3 install psutil pip3 install font_hanken_grotesk pip3 install font_intuitive wget http://storage.googleapis.com/cloud-iot-edge-pretrained-models/edgetpu_api.tar.gz These libraries are for the Inky, it needs fonts to write. The last TAR is for the Edge device and is a fast install documented well by Google. Download Apache NiFi - MiNiFi Java Agent https://nifi.apache.org/minifi/download.html Next up, the most important piece. You will need to have JDK 8 installed on your device if you are using the Java agent. You can also use the MiniFi C++ Agent but that may require building it for your OS/Platform. That has some interesting Python running abilities. Google Coral Documentation - Google Edge TPU Google Edge TPU ML accelerator coprocessor USB 3.0 Type-C socket Supports Debian Linux on host CPU ASIC designed by Google that provides high performance ML inferencing for TensorFlow Lite models https://coral.withgoogle.com/tutorials/edgetpu-retrain-classification-ondevice/ https://coral.withgoogle.com/tutorials/edgetpu-api/ http://storage.googleapis.com/cloud-iot-edge-pretrained-models/edgetpu_api_reference.zip https://coral.withgoogle.com/web-compiler/ https://coral.withgoogle.com/tutorials/edgetpu-models-intro/ https://coral.withgoogle.com/tutorials/accelerator/ https://coral.withgoogle.com/tutorials/edgetpu-api/ https://coral.withgoogle.com/models/ https://coral.withgoogle.com/tutorials/accelerator-datasheet/ Using Pretrained Tensorflow Lite Model: Inception V4 (ImageNet) Recognizes 1,000 types of objects Dataset: ImageNet Input size: 299x299 Let's run a flow: Results (Once an hour we update our E-Ink Display with Date, IP, Run Time, Label 1) Source Code https://github.com/tspannhw/nifi-minifi-coral References https://medium.freecodecamp.org/building-an-iiot-system-using-apache-nifi-mqtt-and-raspberry-pi-ce1d6ed565bc https://community.hortonworks.com/articles/85984/using-minifi-to-read-data-from-a-sense-hat-on-a-ra.html https://community.hortonworks.com/articles/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/articles/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/articles/32605/running-nifi-on-raspberry-pi-best-practices.html https://www.tensorflow.org/lite/convert/cmdline_examples https://www.tensorflow.org/lite/guide/get_started https://pillow.readthedocs.io/en/stable/reference/ImageDraw.html https://coral.withgoogle.com/tutorials/edgetpu-faq/
... View more
Labels:
02-13-2019
05:22 AM
5 Kudos
Use Case IoT Devices with Sensors, Cameras Overview In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis. For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards. We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM. Part 1: https://community.hortonworks.com/content/kbentry/239858/integrating-machine-learning-models-into-your-big.html Part 2: https://community.hortonworks.com/content/kbentry/239961/using-cloudera-data-science-workbench-with-apache.html Summary MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api. For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet. Overall Apache NiFi Flow Workflow walk-thru For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses. For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL. We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response. We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON. Setting up FTP is easy. Here is what some of the sensor data looks like while in motion. We setup a job in CDSW very easily from an existing Python file. After we have run the job a few times we get a nice graph of run duration for our Job history. You can see details of the run including the session and the results. When the job is running you can see it in process and all the completed runs. We can query our data with Pyspark Dataframes for simple output. we can display the schema. We can use Pandas for a nicer table display of the data. Load Data Manually We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW: # To Load Data Created By niFi
!hdfs dfs -mkdir /tmp/status
!hdfs dfs -put status/*.parquet /tmp/status
!hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors
!hdfs dfs -put sensors/*.parquet /tmp/sensors
!hdfs dfs -ls /tmp/sensors
Source Code https://github.com/tspannhw/nifi-cdsw-edge Jolt To Cleanup CDSW Status JSON [{
"operation": "shift",
"spec": { "*.*": "&(0,1)_&(0,2)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)", "*": "&" } },
{ "operation": "remove",
"spec": { "environment": "", "environment*": "", "latest_k8s": "",
"report_attachments": "" }}] We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark. URL to Start a Cloudera Data Science Workbench Job http://cdsw/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4/start as Per: http://cdsw.example.com/api/v1/projects/<$USERNAME>/<$PROJECT_NAME>/jobs/<$JOB_ID>/start What Does the IoT Data Look Like? {
"uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125",
"BH1745_clear" : "0.0",
"te" : "601.1575453281403",
"host" : "piups",
"BH1745_blue" : "0.0",
"imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg",
"lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g",
"cputemp" : 44,
"systemtime" : "02/12/2019 23:34:39",
"memory" : 45.7,
"bme680_tempc" : "23.97",
"imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg",
"bme680_pressure" : "1000.91",
"BH1745_red" : "0.0",
"bme680_tempf" : "75.15",
"diskusage" : "9622.5",
"ltr559_lux" : "000.00",
"bme680_humidity" : "24.678",
"lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11",
"BH1745_green" : "0.0",
"ipaddress" : "192.168.1.166",
"starttime" : "02/12/2019 23:24:38",
"ltr559_prox" : "0000",
"VL53L1X_distance_in_mm" : 553,
"end" : "1550032479.3900714"
} What Does the TensorFlow Image Analytics Data Look Like? {"probability_4":"2.00%","file.group":"root",
"s2s.address":"192.168.1.166:60966",
"probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500",
"probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%",
"file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056",
"absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain",
"label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg",
"label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500",
"file.lastAccessTime":"2019-02-12T18:02:21-0500",
"file.owner":"root",
"label_2":"spotlight",
"label_1":"coffeepot",
"RouteOnAttribute.Route":"isImage"} Transformed Job Status Data {
"id" : 4,
"name" : "Pyspark SQL Job",
"script" : "pysparksqljob.py",
"cpu" : 2,
"memory" : 4,
"nvidia_gpu" : 0,
"engine_image_id" : 7,
"kernel" : "python3",
"englishSchedule" : "",
"timezone" : "America/New_York",
"total_runs" : 108,
"total_failures" : 0,
"paused" : false,
"type" : "manual",
"creator_id" : 19,
"creator_username" : "tspann",
"creator_name" : "Timothy Spann",
"creator_email" : "tspann@EmailIsland.Space",
"creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann",
"creator_html_url" : "http://cdsw-hdp-3/tspann",
"project_id" : 30,
"project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019",
"project_name" : "Future of Data Meetup Princeton 12 Feb 2019",
"project_owner_id" : 19,
"project_owner_username" : "tspann",
"project_owner_email" : "tspann@email.tu",
"project_owner_name" : "Timothy Spann",
"project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann",
"project_owner_html_url" : "http://cdsw-hdp/tspann",
"project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019",
"project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019",
"latest_id" : "jq47droa9zv9ou0j",
"latest_batch" : true,
"latest_job_id" : 4,
"latest_status" : "scheduling",
"latest_oomKilled" : false,
"latest_created_at" : "2019-02-13T13:04:28.961Z",
"latest_scheduling_at" : "2019-02-13T13:04:28.961Z",
"latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa9zv9ou0j",
"latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j",
"latest_shared_view_visibility" : "private",
"report_include_logs" : true,
"report_send_from_creator" : false,
"timeout" : 30,
"timeout_kill" : false,
"created_at" : "2019-02-13T04:46:26.597Z",
"updated_at" : "2019-02-13T04:46:26.597Z",
"shared_view_visibility" : "private",
"url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4",
"html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4",
"engine_id" : "jq47droa9zv9ou0j"
} PySpark Sensor Spark SQL for Data Analysis from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql import SparkSession
pd.options.display.html.table_schema = True
spark = SparkSession .builder .appName("Sensors") .getOrCreate()
# Access the parquet
sensor = spark.read.parquet("/tmp/sensors/*.parquet")
data = sensor.toPandas()
pd.DataFrame(data)
spark.stop() PySpark Status Spark SQL for Data Analysis from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql
import SparkSession
pd.options.display.html.table_schema = True
spark = SparkSession\
.builder\
.appName("Status")\
.getOrCreate()
# Access the parquet
sensor = spark.read.parquet("/tmp/status/*.parquet")
# show content
sensor.show()
# query
#
sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()
sensor.printSchema()sensor.count()
data = sensor.toPandas()pd.DataFrame(data)
spark.stop()
Status Schema (jobstatus) {
"type":"record",
"name":"jobstatus",
"fields":[
{
"name":"id",
"type":["int","null"]
},
{
"name":"name",
"type":["string","null"]
},
{
"name":"script",
"type":["string","null"]
},
{
"name":"cpu",
"type":["int","null"]
},
{
"name":"memory",
"type":["int","null"]
},
{
"name":"nvidia_gpu",
"type":["int","null"]
},
{
"name":"engine_image_id",
"type":["int","null"]
},
{
"name":"kernel",
"type":["string","null"]
},
{
"name":"englishSchedule",
"type":["string","null"]
},
{
"name":"timezone",
"type":["string","null"]
},
{
"name":"total_runs",
"type":["int","null"]
},
{
"name":"total_failures",
"type":["int","null"],
"doc":"Type inferred from '0'"
},
{
"name":"paused",
"type":["boolean","null"],
"doc":"Type inferred from 'false'"
},
{
"name":"type",
"type":["string","null"],
"doc":"Type inferred from '\"manual\"'"
},
{
"name":"creator_id",
"type":["int","null"],
"doc":"Type inferred from '19'"
},
{
"name":"creator_username",
"type":["string","null"]
},
{
"name":"creator_name",
"type":["string","null"]
},
{
"name":"creator_email",
"type":["string","null"]
},
{
"name":"creator_url",
"type":["string","null"]
},
{
"name":"creator_html_url",
"type":["string","null"]
},
{
"name":"project_id",
"type":["int","null"]
},
{
"name":"project_slug",
"type":["string","null"]
},
{
"name":"project_name",
"type":["string","null"]
},
{
"name":"project_owner_id",
"type":["int","null"]
},
{
"name":"project_owner_username",
"type":["string","null"]
},
{
"name":"project_owner_email",
"type":["string","null"]
},
{
"name":"project_owner_name",
"type":["string","null"]
},
{
"name":"project_owner_url",
"type":["string","null"]
},
{
"name":"project_owner_html_url",
"type":["string","null"]
},
{
"name":"project_url",
"type":["string","null"]
},
{
"name":"project_html_url",
"type":["string","null"]
},
{
"name":"latest_id",
"type":["string","null"]
},
{
"name":"latest_batch",
"type":["boolean","null"]
},
{
"name":"latest_job_id",
"type":["int","null"]
},
{
"name":"latest_status",
"type":["string","null"]
},
{
"name":"latest_oomKilled",
"type":["boolean","null"]
},
{
"name":"latest_created_at",
"type":["string","null"]
},
{
"name":"latest_scheduling_at",
"type":["string","null"]
},
{
"name":"latest_url",
"type":["string","null"]
},
{
"name":"latest_html_url",
"type":["string","null"]
},
{
"name":"latest_shared_view_visibility",
"type":["string","null"]
},
{
"name":"report_include_logs",
"type":["boolean","null"]
},
{
"name":"report_send_from_creator",
"type":["boolean","null"]
},
{
"name":"timeout",
"type":["int","null"]
},
{
"name":"timeout_kill",
"type":["boolean","null"]
},
{
"name":"created_at",
"type":["string","null"]
},
{
"name":"updated_at",
"type":["string","null"]
},
{
"name":"shared_view_visibility",
"type":["string","null"]
},
{
"name":"url",
"type":["string","null"]
},
{
"name":"html_url",
"type":["string","null"]
},
{
"name":"engine_id",
"type":["string","null"]
}
]
} Documentation https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_rest_apis.html#cdsw_api References https://community.hortonworks.com/articles/229522/iot-series-sensors-utilizing-breakout-garden-hat-p.html https://community.hortonworks.com/articles/232136/iot-series-sensors-utilizing-breakout-garden-hat-p-1.html Join Me in March at Data Works Summit in Barcelona. Or In Princeton Monthly.
... View more
Labels:
02-10-2019
10:15 PM
This is an update to a previous article on accessing Philadelphia Open Crime Data and storing it in Apache Phoenix on HBase. Part 1: https://community.hortonworks.com/articles/54947/reading-opendata-json-and-storing-into-phoenix-tab.html For nosql summit https://community.hortonworks.com/articles/56642/creating-a-spring-boot-java-8-microservice-to-read.html Update this http://princeton0.field.hortonworks.com:16010/master-status https://community.hortonworks.com/content/kbentry/54947/reading-opendata-json-and-storing-into-phoenix-tab.html Philly Crime Data City of Philly App Token 76MVJDcTksxeS1uYPf8D0XdUF Secret Token WZnlB_YJ5r9rjj_alWVdc_yqnxRpnIk5BHgb crime https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000 https://www.opendataphilly.org/dataset/crime-incidents/resource/f6e5998d-9d33-4a45-8397-3a6bb8607d10 https://www.opendataphilly.org/dataset/crime-incidents https://data.phila.gov/resource/sspu-uyfa.json https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000&dispatch_date=2019-02-18 "dc_dist":"18", "dc_key":"200918067518", "dispatch_date":"2009-10-02", "dispatch_date_time":"2009-10-02T14:24:00.000", "dispatch_time":"14:24:00", "hour":"14", "location_block":"S 38TH ST / MARKETUT ST", "psa":"3", "text_general_code":"Other Assaults", "ucr_general":"800"} CREATE EXTERNAL TABLE crime (dc_dist STRING, dc_key STRING,dispatch_date STRING,dispatch_date_time STRING,hour STRING,location_block STRING,psa STRING, text_general_code STRING,ucr_general STRING) CLUSTERED BY (psa)INTO 4 BUCKETS ROW FORMAT DELIMITED STORED AS ORC LOCATION '/crime/hive' TBLPROPERTIES('transactional'='true'); dc_dist,dc_key ,dispatch_date,dispatch_date_time,hour,location_block,psa,text_general_code,ucr_general Today https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000&dispatch_date=2016-09-12 &dispatch_date=${now():format('yyyy-MM-dd')} 311 service https://data.phila.gov/resource/4t9v-rppq.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000 http://arcgis.dvrpc.org/arcgis/rest/services/Transportation/PedestrianCounts/FeatureServer/1 http://arcgis.dvrpc.org/arcgis/rest/services/Transportation/TrafficCounts/FeatureServer/0 see: https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html Perhaps adapt this https://github.com/socrata/soda-java https://www.opendataphilly.org/ create table crime hdfs dfs -mkdir -p /crime/fail hdfs dfs -mkdir -p /crime https://phoenix.apache.org/faq.html /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure CREATE TABLE phillycrime (dc_dist varchar, dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar, text_general_code varchar,ucr_general varchar); {"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"} upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800'); !tables !describe phillycrime upsert into test values org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix:localhost:2181:/hbase-unsecure /usr/hdp/2.4.0.0-169/phoenix/phoenix-client.jar /usr/hdp/2.4.0.0-169/hbase/lib/hbase-client.jar /etc/hbase/conf/hbase-site.xml plus hadoop ones https://community.hortonworks.com/articles/19016/connect-to-phoenix-hbase-using-dbvisualizer.html 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> !describe phillycrime +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | COLUMN_NAME | | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | | | PHILLYCRIME | DC_DIST | 12 | | | | PHILLYCRIME | DC_KEY | 12 | | | | PHILLYCRIME | DISPATCH_DATE | 12 | | | | PHILLYCRIME | DISPATCH_DATE_TIME | 12 | | | | PHILLYCRIME | HOUR | 12 | | | | PHILLYCRIME | LOCATION_BLOCK | 12 | | | | PHILLYCRIME | PSA | 12 | | | | PHILLYCRIME | TEXT_GENERAL_CODE | 12 | | | | PHILLYCRIME | UCR_GENERAL | 12 | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800'); 1 row affected (0.115 seconds) 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> select * from phillycrime; +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | DC_DIST | DC_KEY | DISPATCH_DATE | DISPATCH_DATE_TIME | | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | 18 | 200918067518 | 2009-10-02 | 2009-10-02T14:24:00.000 | 14 | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ 1 row selected (0.197 seconds) upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}') jdbc:phoenix:localhost:2181:/hbase-unsecur> select distinct(text_general_code) from phillycrime; https://data.phila.gov/resource/sspu-uyfa.json?$$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$$limit=5000&dispatch_date=${now():format('yyyy-MM-dd')} <value>phillycrime${now():toNumber()}.json</value> upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}') <name>URL</name> <value>https://data.phila.gov/resource/sspu-uyfa.json?$$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$$limit=5000&dispatch_date=${now():format('yyyy-MM-dd')}</value> </property> <property> <name>Filename</name> <value>phillycrime${now():toNumber()}.json</value> </property> <name>Regular Expression</name> <value>(?s:^.*$)</value> </property> <property> <name>Replacement Value</name> <value>upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}')</value> <comment/> <class>org.apache.nifi.dbcp.DBCPConnectionPool</class> <enabled>true</enabled> <property> <name>Database Connection URL</name> <value>jdbc:phoenix:localhost:2181:/hbase-unsecure</value> </property> <property> <name>Database Driver Class Name</name> <value>org.apache.phoenix.jdbc.PhoenixDriver</value> </property> <property> <name>Database Driver Jar Url</name> <value>file:///usr/hdp/2.4.0.0-169/phoenix/phoenix-client.jar</value> https://data.phila.gov/resource/sspu-uyfa.json https://www.opendataphilly.org/dataset/crime-incidents Run spring boot … https://www.opendataphilly.org/dataset/crime-incidents https://community.hortonworks.com/articles/72420/ingesting-remote-sensor-feeds-into-apache-phoenix.html https://github.com/tspannhw/phoenix org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix:princeton0.field.hortonworks.com:/hbase-unsecure phoenixuser
... View more
Labels:
02-10-2019
10:08 PM
Tracking Air Quality with HDP and HDF: Part 2 - Indoor Air Quality Using a few sensors on a MiniFi node we are able to generate some air quality sensor readings. Source: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/aqminifi.py Data: row['bme680_tempc'] = '{0:.2f}'.format(sensor.data.temperature) row['bme680_tempf'] = '{0:.2f}'.format((sensor.data.temperature * 1.8) + 32) row['bme680_pressure'] = '{0:.2f}'.format(sensor.data.pressure) row['bme680_gas'] = '{0:.2f}'.format(gas) row['bme680_humidity'] = '{0:.2f}'.format(hum) row['bme680_air_quality_score'] = '{0:.2f}'.format(air_quality_score) row['bme680_gas_baseline'] = '{0:.2f}'.format(gas_baseline) row['bme680_hum_baseline'] = '{0:.2f}'.format(hum_baseline) See Part 1: https://community.hortonworks.com/articles/189630/tracking-air-quality-with-hdp-and-hdfi-part-1-apac.html Newark / NYC Hazecam https://hazecam.net/images/main/newark.jpg Example {"bme680_air_quality_score": "82.45", "uuid": "20190131191921_59c5441c-47b4-4f6f-a6d6-b3943bc9cf2b", "ipaddress": "192.168.1.166", "bme680_gas_baseline": 367283.28, "bme680_pressure": "1024.51", "bme680_hum_baseline": 40.0, "memory": 11.7, "end": "1548962361.4146328", "cputemp": 47, "host": "piups", "diskusage": "9992.7", "bme680_tempf": "87.53", "te": "761.2184100151062", "starttime": "01/31/2019 14:06:40", "systemtime": "01/31/2019 14:19:21", "bme680_humidity": "13.22", "bme680_tempc": "30.85", "bme680_gas": "363274.92"} { "end" : "1548967753.7064438", "host" : "piups", "diskusage" : "9990.4", "cputemp" : 47, "starttime" : "01/31/2019 15:44:11", "bme680_hum_baseline" : "40.00", "bme680_humidity" : "13.23", "ipaddress" : "192.168.1.166", "bme680_tempc" : "30.93", "te" : "301.96490716934204", "bme680_air_quality_score" : "83.27", "systemtime" : "01/31/2019 15:49:13", "bme680_tempf" : "87.67", "bme680_gas_baseline" : "334942.60", "uuid" : "20190131204913_4984a635-8dcd-408a-ba23-c0d225ba2d86", "bme680_pressure" : "1024.69", "memory" : 12.6, "bme680_gas" : "336547.19" }
... View more
Labels:
02-09-2019
06:30 PM
1 Kudo
Use Case:
We have data stored in a MongoDB from a third party application in Amazon.
Export from MongoDB to Parquet.
Moving data from a single purpose data silo to your Enterprise Data Lake is a common use case. Using Apache NiFi we can easily save your data from this remote silo and bring it streaming into your analytics store for machine learning and deep analytics with Impala, Hive and Spark. It doesn't matter which cloud which are coming from or going to or from cloud to on-premise or various Hybrid situations. Apache NiFi will work in all of these situations which full data lineage and provenance on what it did when.
I have created a mock dataset with Mockaroo. It's all about yummy South Jersey sandwiches.
Our Easy MongoDB Flows to Ingest Mongo data to our Date Lake and another flow to load MongoDB.
In our test, we loaded all the data from our Mock REST API into a MongoDB in the cloud. In the real world an application populated that dataset and now we need to bring it into our central data lake for analytics.
We use Jolt to replace the non-Hadoop friendly built-in MongoDB _id with a friendly name mongo_id.
Storing to Parquet on HDFS is Easy (Let's compress with Snappy)
Connecting to MongoDB is easy, setup a controller and specify the database and collection.
Our MongoDB Connection Service, just enter your URI with username/password@server.
GetHTTP URL https://my.api.mockaroo.com/hoagie.json GetHTTP Filename ${filename:append('hoagie.'):append(${now():format('yyyyMMddHHmmSS'):append(${md5}):append('.json')})} JSON Path Expression $.* JOLT Chain [{ "operation": "shift", "spec": { "_id": "mongo_id", "*": "&" } }] Mongo URI mongodb://user:userpassword@server.cloud.com:13916/nifi
Many files stored in HDFS as Parquet
... View more
Labels:
02-09-2019
05:59 PM
3 Kudos
Series: Integration of Apache NiFi and Cloudera Data Science Workbench Part 2: Using Cloudera Data Science Workbench with Apache NiFi and Apache MXNet for GluonCV YOLO Workloads Summary Now that we have shown it's easy to do standard NLP, next up is Deep Learning. As you can see NLP, Machine Learning, Deep Learning and more are all in our reach for building your own AI as a Service using tools from Cloudera. These can run in public or private clouds as scale. Now you can run and integrate machine learning services, computer vision APIs and anything you have created in house with your own Data Scientists or with the help of Cloudera Fast Forward Labs (https://www.cloudera.com/products/fast-forward-labs-research.html). The YOLO pretrained model will download the image to /tmp from the URL to process it. The Python 3 script will also download the GLUONCV model for YOLO3 as well. Using Pre-trained Model: yolo3_darknet53_voc Image Sources https://github.com/tspannhw/images and/or https://picsum.photos/400/600 Example Input { "url": "https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg" } Sample Call to Our REST Service curl -H "Content-Type: application/json" -X POST http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model -d '{"accessKey":"longkeyandstuff","request":{"url":"https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg"}}' Sample JSON Result Set {"class1": "cat", "pct1": "98.15670800000001", "host": "gluoncv-apache-mxnet-29-49-67dfdf4c86-vcpvr", "shape": "(1, 3, 566, 512)", "end": "1549671127.877511", "te": "10.178656578063965", "systemtime": "02/09/2019 00:12:07", "cpu": 17.0, "memory": 12.8} Example Deployment Model Resources Replicas 1 Total CPU 1 vCPUs <-- An extra vCPU wouldn't hurt. Total Memory 8.00 GiB <-- Make sure you have enough RAM. I recommend for Deep Learning models to use more vCPUs and more memory as you will be manipulating images and large tensors. I also recommend more replicas for production use cases. You can have up to 9. https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_models.html. I like the idea of 3, 5 or 7 replicas. How-To Step 1: Let's Clean Up and Test Some Python 3 Code So first I take an existing example Python3 GluonCV Apache MXNet YOLO code that I already have. As you can see it uses a pretrained model from Apache MXNet's rich model zoo. This started here: https://github.com/tspannhw/nifi-gluoncv-yolo3 I paired down the libraries as I used an interactive Python 3 session to test and refine my code. As before, I set a variable to pass in my data, this time a URL pointing to an image. As you can see in my interactive session I can run my yolo function and get results. I had a library in there to display the annotated image while I was testing. I took this code off to save time, memory and reduce libraries. This was needed while testing though. The model seems to be working it identified me as a person and my cat as a cat. Step 2: Create, Build and Deploy a Model I got to models, point to my new file and the function I used (yolo) and put in a sample Input and response. I deploy it, then it is in the list of available models. You goes through a few steps as the docker container is deployed to K8 and all the required pips are installed during a build process. Once it is built, you can see the build(s) in the Build screen. Step 3: Test the Model Once it is done building and marked deployed we can use the built in tester from Overview. We can see the result in JSON ready to travel over an HTTP REST API. Step 4: Monitor the Deployed Model We can see the standard output and error and see how many times we are called and success. You can see it downloaded the model from the Apache MXNet zoo. If you need to stop, rebuild or replace a model, it's easy. Step 5: Apache NiFi Flow As you can see it's a few steps to run the flow. I am using GenerateFlowFile to get us started, but I could have a cron scheduler starting us or react to a Kafka/MQTT/JMS message or another trigger. I then build the JSON needed to call the REST API. Example: {"accessKey":"accesskey","request":{"url":"${url}"}} Then we call the REST API via an HTTP Post (http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model). We then parse the JSON it returns to just give us the fields we want, we don't really need status. We name our schema so we can run Apache Calcite SQL queries against it. Let's only save Cats and People to our Amazon S3 bucket. At this point I can add more queries and destinations. I can store it everywhere or anywhere. Example Output {
"success": true,
"response":
{ "class1": "cat", "cpu": 38.3, "end": "1549672761.1262221", "host": "gluoncv-apache-mxnet-29-50-7fb5cfc5b9-sx6dg", "memory": 14.9, "pct1": "98.15670800000001", "shape": "(1, 3, 566, 512)", "systemtime": "02/09/2019 00:39:21", "te": "3.380652666091919" }}
Build a Schema for the Data and store it in Apache NiFi AVRO Schema Registry or Cloudera Schema Registry { "type" : "record", "name" : "gluon", "fields" : [ { "name" : "class1", "type" : ["string","null"] }, { "name" : "cpu", "type" : ["double","null"] }, { "name" : "end", "type" : ["string","null"]}, { "name" : "host", "type" : ["string","null"]}, { "name" : "memory", "type" : ["double","null"]}, { "name" : "pct1", "type" : ["string","null"] }, { "name" : "shape", "type" : ["string","null"] }, { "name" : "systemtime", "type" : ["string","null"] }, { "name" : "te", "type" : ["string","null"] } ] } I like to allow for nulls in case we have missing data, but that is up to your Data Steward and team. If you need to add a version of the schema with a new field, you must add "null" as an option since old data won't have that if you wish to share a schema. Source: https://github.com/tspannhw/nifi-cdsw-gluoncv cdswmxnet.xml
... View more
Labels:
02-06-2019
05:49 PM
4 Kudos
Using Deployed Models as a Function as a Service
Using Cloudera Data Science Workbench with Apache NiFi we can easily call functions within our deployed models from Apache NiFi as part of flows. I am working against CDSW on HDP (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_hdp.html), but it will work for all CDSW regardless of install type. In my simple example, I built a Python model that uses TextBlob to run sentiment against a passed in sentence. It returns Sentiment Polarity and Subjectivity which we can immediately act upon in our flow. CDSW is extremely easy to work with and I was up and running in a few minutes. For my model, I created a python 3 script and a shell script for install details. Both of these artifacts are available here: https://github.com/tspannhw/nifi-cdsw My Apache NiFi 1.8 flow is here (I use no custom processors): cdsw-twitter-sentiment.xml Deploying a Machine Learning Model as a REST Service Once you login to CDSW and create a project or choose an existing one (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_projects.html). From your project, open workbench and you can install some libraries and test some Python. I am using a Python 3 session to download the TextBlob/NLTK Corpora for NLP. Let's Pip Install some libraries for testing Let's Create a new Model You choose your file (mine is sentiment.py see github). The function name is actually sentiment. Notice a typo I had to rebuild this and deploy. You setup an example input (sentence is the input parameter name) and an example output. Input and output will be JSON since this is a REST API. Let's Deploy It (Python 3) The deploy will build it for deployment. We can see standard output, standard error, status, # of REST calls received and success. Once a Model is Deployed We Can Control It We can stop it, rebuild it or replace the files if need be. I had to update things a few times. The amount of resources used for the model rest hosting if your choice from a drop down. Since I am doing something small I picked the smallest model with only 1 virtual CPU and 2 GB of RAM. All of this is running in Docker on Kubernetes! Once Deployed, It's Ready To Test and Use From Apache NiFi Just click test. See the JSON results and we can now call it from an Apache NiFi flow. Once Deployed We Can Monitor The Model Let's Run the Test See the status and response! Apache NiFi Example Flow Step 1: Call Twitter Step 2: Extract Social Attributes of Interest Step 3: Build our web call with our access key and function parameter Step 4: Extract our string as a flow file to send to the HTTP Post Step 5: Call Our Cloudera Data Science Workbench REST API (see tester). Step 6: Extract the two result values. Step 7: Let's route on the sentiment We can have negative (<0), neutral (0), positive (>0) and very positive (1) polarity of the sentiment. See TextBlob for more information on how this works. Step 8: Send bad sentiment to a slack channel for human analysis. We send all the related information to a slack channel including the message. Example Message Sent to Slack Step 9: Store all the results (or some) in either Phoenix/HBase, Hive LLAP, Impala, Kudu or HDFS. Results as Attributes Slack Message Call
${msg:append(" User:"):append(${user_name}):append(${handle}):append(" Geo:"):append(${coordinates}):append(${geo}):append(${location}):append(${place}):append(" Hashtags:"):append(${hashtags}):append(" Polarity:"):append(${polarity}):append(" Subjectivity:"):append(${subjectivity}):append(" Friends Count:"):append(${friends_count}):append(" Followers Count:"):append(${followers_count}):append(" Retweet Count:"):append(${retweet_count}):append(" Source:"):append(${source}):append(" Time:"):append(${time}):append(" Tweet ID:"):append(${tweet_id})}
REST CALL to Model
{"accessKey":"from your workbench","request":{"sentence":"${msg:replaceAll('\"', ''):replaceAll('\n','')}"}}
Resources
https://textblob.readthedocs.io/en/dev/api_reference.html#textblob.blob.TextBlob.sentiment https://community.hortonworks.com/articles/222605/converting-powerpoint-presentations-into-french-fr.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html
... View more
Labels: