1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2470 | 04-03-2024 06:39 AM | |
| 3819 | 01-12-2024 08:19 AM | |
| 2064 | 12-07-2023 01:49 PM | |
| 3045 | 08-02-2023 07:30 AM | |
| 4182 | 03-29-2023 01:22 PM |
10-06-2017
06:54 PM
There has been a major upgrade of the processor thanks to @Simon Elliston Ball Check out the latest version 2.1. I am also prototyping a DL4J processor based on this and some code from the SkyMind guys.
... View more
07-06-2017
08:41 PM
4 Kudos
This is a cool HAT that attaches without soldering to a Raspberry Pi 3. The main use is with Google's Android Things which is an IoT OS based on Android that you can run on Raspberry Pi and other devices. You can also use this HAT with regular Raspian and Python. This HAT is cool because it has input and output. As part of our Python script, we ingest data from it's sensor, but we also allow for input to it's A, B and C buttons clearly marked on the top. By selecting one of the buttons you can display the current temperature in Celsius, Fahrenheit or the current altitude. The altitude is some code I found commented out in the Pimoroni Rainbow HAT interface Python script. It determines altitude based on a starting value and pressure readings. Looks kind of cool and is relatively accurate. For point of reference, Princeton New Jersey is pretty close to sea level. My script is heavily copied and customized from my previous IoT Python scripts and from the RainbowHAT examples. This hat has 4 digit display, seven multiple color LEDs, BMP280 temperature and pressure sensor and some more goodies in this tiny device. It's available from AdaFruit. We execute the Python code via a simple shell script wrapper: python /opt/demo/minifi.py Source Code: https://github.com/tspannhw/rpi-rainbowhat Prepare the Pi sudo apt-get purge libreoffice wolfram-engine sonic-pi scratch sudo apt-get autoremove Example Python Debug Output CPU: 45.0 C
Corrected Temp: 23.7 C
Room Temp: 74.7 F
Pressure: 101598.9 Pa
Altitude: 33.2
This is the MiniFi flow that I modeled in Apache NiFi, then export as XML. Then use the below shell script to convert into config.yml format to transfer to my device running Apache MiniFi Java agent. This is a simple flow to add a schema name, tell NiFi it's a JSON stream and then query it. If the data has temperatures above a threshold (checked via inline SQL) write the AVRO file to ORC and store in HDFS (automagically building an external Hive table on top of it). Schema to put in Hortonworks Schema Registry or NiFi AVRO Schema Registry {"type":"record","namespace":"hortonworks.hdp.refapp.rainbow","name":"rainbow","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "tempf2","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "altitude","type": "float"}]} It's pretty easy to define an AVRO schema (in JSON format), like above). Feel free to use the above as a starting point. Strings and floats work very well, keep it simple and leave out extra whitespace. Build Your Configuration minifi-toolkit-1.0.3.0.0.0-453/bin/config.sh transform RainbowForMiniFi.xml config.yml nifireceiverrainbow.xml rainbowforminifi.xml Coming Soon Raspberry Pi 3 running Android Things and Java Connecting to Rainbow Hat, TensorFlow and MiniFi. References:
https://github.com/pimoroni/rainbow-hat https://github.com/androidthings/weatherstation https://en.wikipedia.org/wiki/QNH https://en.wikipedia.org/wiki/Pascal_(unit) https://www.mide.com/pages/air-pressure-at-altitude-calculator https://hackernoon.com/trying-out-the-android-things-weatherstation-codelab-d3f260b59c2f https://codelabs.developers.google.com/codelabs/androidthings-weatherstation/index.html https://shop.pimoroni.com/products/rainbow-hat-for-android-things https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-rainbow-hat-in-python Part of My IoT MiniFi Series (Details Installation and Setup)
https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/content/kbentry/108966/minifi-for-sensor-data-ingest-from-devices.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
... View more
Labels:
07-06-2017
07:10 PM
2 Kudos
I am always ingesting social data, IoT and mobile streams into my HDFS cluster. Unfortunately my cluster is a ephemeral cloud based Hortonworks HDP 2.6 Hadoop cluster, so I don't have a permanent store for my data. I have my processes run for a few weeks and then they are destroyed.
I wanted to quick way to save all my ORC files.
Enter NiFi.
Backup
First we list from some top level directories in HDFS to capture all the files and sub-directories we want to backup. Each processor maintains a timestamp to know what files it processed already, as new files are added they will be assimilated into the flow.
For massive data migration, we can run this on many nodes and use the Distributed Cache service to maintain the state.
Restore
The flow is very simple to restore, read from the local file system and write to HDFS. For HDFS, I use /${path} as the directory so each file is written to the correct sub-directory for it's file group. Easy it's like rsync, but it's Tim Sync. Make sure you have your Hadoop configuration file set. If you are using Kerberos make sure you set your principal and keytab, be very careful for Case-Sensitivity!
For Restore it doesn't get any simpler, I use GetFile to read from the local file system. As part of that those files are deleted, I have a big USB 3.0 drive that I want to keep them, so I copy them to a different directory for later storage. I should probably compress those. Once they get large enough I may invest in a local RPI Storage array running HDP 2.6 with some attached Western Digital PiDrives.
The Data Provenance of one of the flowfiles shows the path and filename. It makes it very easy to move between say S3, on-premise HDFS, local file systems, cloud file systems, jump drives or wherever. Your data is yours, take it with you.
... View more
02-01-2019
04:22 PM
I meet a problem,the data from mysql in hive is replaced. When the property of Conflict Resolution Strategy is append,i cannot query the count from hive with the statement "select count(1) from table_xxx". help!!!
... View more
06-16-2017
04:49 PM
2 Kudos
See
Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html
See Part 2: https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
We build a flow in Apache NiFi and then export the template. Using the MiniFi tool we convert this into a config.yaml file and send it to our device via scp. You can see this in Part 1. This simple flow calls a shell script that will run a Python script to get our sensor data. This flow will then send the data to our NiFi server via S2S over HTTP.
What I have added in this part is use of the new Record and Schema paradigms and also the ability to SQL queries against incoming flow files using
QueryRecord. This requires building an AVRO schema for our data, which is dead easy JSON definition.
We set out port to connect the minifi agent to our server.
Data quickly starts coming in.
Receive the JSON Messages in NiFi via S2S
Top Steps to Ingest Sensor Day in a Few Hours
Connect to the port Set a schema to pull from the registry, also set a mime-type for JSON Query the flow file and just take ones over 65 degrees Fahrenheit via Apache Calcite processed SQL These produces an AVRO file using the AvoRecordSetWriter and the schema from the AvroSchemaRegistry I store the AVRO file produced to HDFS I store the raw JSON file sent in to HDFS I convert the AVRO file to ORC I store the ORC files to HDFS I grab the autogenerated hive.ddl to create the external tables. I query my sensor data in Zeppelin
hive.ddl Automagically Generated Hive Schema
CREATE EXTERNAL TABLE IF NOT EXISTS sensor (tempf FLOAT, cputemp FLOAT, ts STRING, pressure FLOAT, host STRING, pitch FLOAT, ipaddress STRING, temp FLOAT, diskfree STRING, yaw FLOAT, humidity FLOAT, memory FLOAT, y FLOAT, x FLOAT, z FLOAT, roll FLOAT) STORED AS ORC
I grab the HDFS location and add that to the DDL: LOCATION '/sensor'.
For the AVRO and JSON versions of the data, I make similar tables.
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/jsonsensor';
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION '/avrosensor';
Install Libraries (See Part 1 for MiniFi install)
pip install --upgrade sense-hat<br>pip install --upgrade pillow<br>pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
Shell Script
python /opt/demo/rpi-sensehat-mqtt-nifi/sense2.py
Python Script
from sense_hat import SenseHat
import json
import sys, socket
import os
import psutil
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
# get data
#current_milli_time = lambda: int(round(time.time() * 1000))
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
rasp = ('armv' in os.uname()[4])
cpu = psutil.cpu_percent(interval=1)
if rasp:
f = open('/sys/class/thermal/thermal_zone0/temp', 'r')
l = f.readline()
ctemp = 1.0 * float(l)/1000
usage = psutil.disk_usage("/")
mem = psutil.virtual_memory()
diskrootfree = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
mempercent = mem.percent
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
#p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
#out, err = p.communicate()
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
ipaddress = IP_address()
sense = SenseHat()
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 2)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
#cputemp = out
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
pitch=round(pitch,0)
roll=round(roll,0)
yaw=round(yaw,0)
row = { 'ts': currenttime, 'host': host, 'memory': mempercent, 'diskfree': diskrootfree, 'cputemp': round(ctemp,2), 'ipaddress': ipaddress, 'temp': temp, 'tempf': round(((temp * 1.8) + 12),2), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z }
json_string = json.dumps(row)
print(json_string)
One Record (JSON)
{"tempf": 75.14, "temp": 35.08, "pitch": 1.0, "diskfree": "1211.8 MB", "yaw": 55.0, "cputemp": 52.08, "ts": "2017-06-16 17:39:08", "humidity": 41.5, "pressure": 0.0, "host": "picroft", "memory": 23.0, "y": 0.0, "x": -1.0, "z": 1.0, "ipaddress": "192.168.1.156", "roll": 1.0}
AVRO Schema (JSON Format)
{"type":"record","namespace":"hortonworks.hdp.refapp.sensehat","name":"sensehat","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "pitch","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "diskfree","type": "string"},{ "name": "yaw","type": "float" },{"name": "humidity","type": "float"},{"name": "memory","type": "float"},{"name": "y", "type": "float"},{"name": "x", "type": "float" },{"name": "z","type": "float"},{"name": "roll", "type": "float"}]}
config.yml
MiNiFi Config Version: 2
Flow Controller:
name: sense hat
comment: sense hat 2017
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: db6fbd3b-ddf4-3041-0000-000000000000
name: ExecuteProcess
class: org.apache.nifi.processors.standard.ExecuteProcess
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 60 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Argument Delimiter: ' '
Batch Duration:
Command: /opt/demo/rpi-sensehat-mqtt-nifi/sense2.sh
Command Arguments:
Redirect Error Stream: 'true'
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 5635290a-4cb6-3da7-0000-000000000000
name: minifiSenseHat
source id: db6fbd3b-ddf4-3041-0000-000000000000
source relationship names:
- success
destination id: 166616e3-1962-1660-2b7c-2f824584b23a
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: fdc45649-84be-374b-0000-000000000000
name: ''
url: http://hw13125.local:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: HTTP
Input Ports:
- id: 166616e3-1962-1660-2b7c-2f824584b23a
name: MiniFi SenseHat
comment: ''
max concurrent tasks: 1
use compression: false
Build our MiniFi Configuration File from the sensorminif.xml minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform sensorminifi.xml config.yml Then just SCP to your device.
Flows
sensornifi.xml sensorminifi.xml
Source Repository
https://github.com/tspannhw/rpi-sensehat-minifi-python Example MiniFi Log dResourceClaim[id=1497645887239-1, container=default, section=1], offset=2501, length=278],offset=0,name=13917785142443,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 116 milliseconds at a rate of 2.32 KB/sec
2017-06-16 20:54:41,827 INFO [Provenance Maintenance Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 3162
2017-06-16 20:54:41,844 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/3159.prov in 33 milliseconds
2017-06-16 20:54:41,846 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-16 20:54:43,288 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@87eb01 checkpointed with 0 Records and 0 Swap Files in 100 milliseconds (Stop-the-world time = 13 milliseconds, Clear Edit Logs time = 10 millis), max Transaction ID -1
2017-06-16 20:54:48,429 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-16 20:54:48,890 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@17461db checkpointed with 0 Records and 0 Swap Files in 460 milliseconds (Stop-the-world time = 190 milliseconds, Clear Edit Logs time = 77 millis), max Transaction ID 2107
2017-06-16 20:54:48,891 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 461 milliseconds
2017-06-16 20:54:51,482 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@f69f9d Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-16 20:55:07,621 INFO [Timer-Driven Process Thread-9] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-06-16 20:55:07,957 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi SenseHat,target=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=b3bcd211-7425-4750-9e4c-ba2d477b9cc1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497645887239-1, container=default, section=1], offset=2779, length=278],offset=0,name=13979556432846,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 114 milliseconds at a rate of 2.38 KB/sec
Check the Status of MiniFi root@picroft:/opt/demo/minifi-1.0.2.1.4.0-5# bin/minifi.sh flowStatus processor:db6fbd3b-ddf4-3041-0000-000000000000:health,stats,bulletins
minifi.sh: JAVA_HOME not set; results may vary
Bootstrap Classpath: /opt/demo/minifi-1.0.2.1.4.0-5/conf:/opt/demo/minifi-1.0.2.1.4.0-5/lib/bootstrap/*:/opt/demo/minifi-1.0.2.1.4.0-5/lib/*
Java home:
MiNiFi home: /opt/demo/minifi-1.0.2.1.4.0-5
Bootstrap Config File: /opt/demo/minifi-1.0.2.1.4.0-5/conf/bootstrap.conf
FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='ExecuteProcess', processorHealth={runStatus='Running', hasBulletins=false, validationErrorList=[]}, processorStats={activeThreads=0, flowfilesReceived=0, bytesRead=0, bytesWritten=1390, flowfilesSent=0, invocations=5, processingNanos=9290051632}, bulletinList=[]}], connectionStatusList=null, remoteProcessGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null, reportingTaskStatusList=null, errorsGeneratingReport=[]}
Output Displayed in Apache Zeppelin Workbook
Using the DDL generated by Apache NiFi, we can create external Hive tables for the raw JSON data, the ORC cleaned up version of the data and also an AVRO version of the data.
We can then query our datasets.
References:
https://cwiki.apache.org/confluence/display/Hive/AvroSerDe https://json-schema-validator.herokuapp.com/avro.jsp https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://community.hortonworks.com/articles/55839/reading-sensor-data-from-remote-sensors-on-raspber.html https://www.thepolyglotdeveloper.com/2016/08/connect-multiple-wireless-networks-raspberry-pi/ https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
06-16-2017
02:27 PM
5 Kudos
See Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html In this article, we are ingesting beacon data using an ASUS Tinkerboard which is very similar to a Raspberry PI in form, form factor and features. It is newer and has less peripherals, but has 2 gig of RAM, ARM Mali GPU T760 MP4, Quad-core ARM Cortex-A17 1.8Ghz processor with a dual-channel DDR3 memory and BLE built-in (Bluetooth 4.0 with EDR). For beacons, I am using the basic Estimate three beacon collection. They have a really nice solid beacon in a silicon case, that looks nice and works well. They have a mobile app to manage, monitor and simulate as well as a cloud app. From our Apache NiFi server we have an Input Port that is receiving the messages from the Java MiniFi agent running on the ASUS Tinkerboard. As with most of the data, I want to ingest the data every minute. The ASUS Tinkerboard device has the read light lit up. The Estimate beacons are the colored devices to the right. For Tinkerboard's the default username is linaro. Requires BlueTooth Reading Libraries in Linux sudo apt-get -y install bluetooth bluez libbluetooth-dev libudev-dev bluez-hcidump python-bluez
sudo apt-get -y update
sudo apt-get -y install python-dev python3-dev
pip install beacontools[scan]
The built-in BlueTooth is hci0. You will need to have Python 2.7 and/or Python 3.5 installed. Install Oracle JDK 8 sudo apt-get -y install oracle-java8-jdk Shell Script to ExcuteProcess /opt/demo/py-decode-beacon/runble.sh python bluez_scan.py Python Script See: Python beacon advertisement decoder
Copyright (c) 2015 Patrick Van Oosterwijck https://github.com/adamf/BLE/blob/master/ble-scanner.py and https://github.com/xorbit/py-decode-beacon Starting MiniFi on Tinkerboard bin/minifi.sh start Diagnostics on MiniFi ./bin/minifi.sh flowStatus systemdiagnostics:heap,processorstats,contentrepositoryusage,flowfilerepositoryusage,garbagecollection Logs From MiniFi 2017-06-12 17:13:49,873 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi Tinker,target=http://hw13125.local:8080/nifi/] Successfully sent [StandardFlowFileRecord[uuid=98de3195-8be3-4433-840b-c84b9a84fb6f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497282405127-2, container=default, section=2], offset=729236, length=16331],offset=0,name=98638611491740,size=16331]] (15.95 KB) to http://HW13125.local:8080/nifi-api in 138 milliseconds at a rate of 114.88 KB/sec
2017-06-12 17:13:57,918 INFO [Provenance Maintenance Thread-3] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 2365
2017-06-12 17:13:57,962 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/2362.prov in 243 milliseconds
2017-06-12 17:13:57,971 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-12 17:14:04,671 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:14:21,554 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-12 17:14:26,057 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@541c15 checkpointed with 0 Records and 0 Swap Files in 4502 milliseconds (Stop-the-world time = 3887 milliseconds, Clear Edit Logs time = 549 millis), max Transaction ID 985
2017-06-12 17:14:26,059 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 4503 milliseconds
2017-06-12 17:14:42,783 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@3e088f checkpointed with 0 Records and 0 Swap Files in 97 milliseconds (Stop-the-world time = 49 milliseconds, Clear Edit Logs time = 33 millis), max Transaction ID -1
2017-06-12 17:15:05,438 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:15:49,116 INFO [Timer-Driven Process Thread-5] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
It's a very simple model, just execute the Python and send the resultant JSON to NiFi for processing and storage. The same code will also work on a Raspberry Pi and for most similar Linux devices. ASUS Tinkerboard is running TinkerOS_Debian V1.8, not Raspian. References:
https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html https://dzone.com/articles/using-tinkerboard-with-tensorflow-and-python https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-quick-start/content/ch_minifi-quick-start.html https://community.hortonworks.com/articles/56341/getting-started-with-minifi.html https://github.com/hipol/EstimoteServer https://github.com/taka-wang/py-beacon https://learn.adafruit.com/bluefruit-le-python-library https://github.com/citruz/beacontools https://pypi.python.org/pypi/beacontools/1.0.1 https://github.com/adamf/BLE/blob/master/ble-scanner.py https://github.com/xorbit/py-decode-beacon https://cloud.estimote.com/docs/ https://github.com/estimote/estimote-specs https://evothings.com/doc/examples/ibeacon-scan.html https://github.com/switchdoclabs/iBeacon-Scanner- http://www.switchdoc.com/2014/08/ibeacon-raspberry-pi-scanner-python/ https://github.com/mlwelles/BeaconScanner https://github.com/dburr/linux-ibeacon https://github.com/beaconinside/awesome-beacon https://github.com/adafruit/Adafruit_Python_BluefruitLE https://www.asus.com/us/Single-Board-Computer/Tinker-Board/ https://www.asus.com/us/Single-Board-Computer/Tinker-Board/HelpDesk_Download/
... View more
Labels:
02-13-2019
09:54 PM
This is superb. I plan to do the same. Quick question , whats the version of the Minifi in the sense is it Java or C ++ . @Timothy Spann
... View more
05-22-2017
11:23 PM
2 Kudos
Ingesting JMS Data into Hive A company has a lot of data transmitting around the enterprise asynchronously with Apache ActiveMQ, they want to tap into it and convert JSON messages coming from web servers and store into Hadoop. I am storing the data into Apache Phoenix / HBase via SQL. And also since it's so easy, I am storing the data into ORC files in HDFS for Apache Hive access.
Apache NiFi 1.2 generates DDL for a Hive Table for us hive.ddl
CREATE EXTERNAL TABLE IF NOT EXISTS meetup
(id INT, first_name STRING, last_name STRING, email STRING, ip_address STRING, company STRING, macaddress STRING, cell_phone STRING) STORED AS ORC
LOCATION '/meetup'
insert into meetup
(id , first_name , last_name , email , ip_address , company , macaddress , cell_phone)
values
(?,?,?,?,?,?,?,?)
HDF Flow
ConsumeJMS Path 1: Store in Hadoop as ORC with a Hive Table
InferAvroSchema: get a schema from the JSON data ConvertJSONtoAVRO: build an AVRO file from JSON data MergeContent: build a larger chunk of AVRO data ConvertAvroToORC: build ORC files PutHDFS: land in your Hadoop data lake
Path 2: Upsert into Phoenix (or any SQL database)
EvaluateJSONPath: extract fields from the JSON file UpdateAttribute: Set SQL fields ReplaceText: Create SQL statement with ?. PutSQL: Send to Phoenix through Connection Pool (JDBC) Path 3: Store Raw JSON in Hadoop
PutHDFS: Store JSON data on ingest
Path 4: Call Original REST API to Obtain Data and Send to JMS
GetHTTP: call a REST API to retrieve JSON arrays SplitJSON: split the JSON file into individual records PutJMS <or> PublishJMS: two ways to push messages to JMS. One uses a JMS controller and another uses a JMS client without a controller. I should benchmark this.
Error Message from Failed Load
If I get errors on JMS send, I send the UUID of the file to Slack for ChatOps.
Zeppelin Display of SQL Data
To check the tables I use Apache Zeppelin to query Phoenix and Hive tables.
Formatting in UpdateAttribute for SQL Arguments To set the ? properties for the JDBC prepared statements, we go by #, starting from 1. The type is the JDBC Type, 12 is String; and the value is the value of the FlowFile fields. Yet another message queue ingested with no fuss.
... View more
Labels:
05-22-2017
08:16 PM
4 Kudos
Backup Files from Hadoop
ListHDFS
set parameters, pick a high level directory and work down. /etc/hadoop/conf/core-site.xml FetchHDFS
${path}/${filename} PutFile Store in local Backup Hive Tables SelectHiveQL
Output format AVRO, with SQL: select * from beaconstatus ConvertAVROtoORC
generic for all the tables UpdateAttribute
tablename ${hive.ddl:substringAfter('CREATE EXTERNAL TABLE IF NOT EXISTS '):substringBefore(' (')} PutFile
Use replace directories and create missing directories with directory: /Volumes/Transcend/HadoopBackups/hive/${tablename} For Phoenix tables, I use the same ConvertAvroToORC, UpdateAttribute and PutFile boxes and just add ExecuteSQL to ingest Phoenix data. For every new table, I add one box and link it to ConvertAvroToORC. Done! This is enough of a backup so if I need to rebuild and refill my development cluster, I can do so easily. Also I have them schedule for once a day to rewrite everything. This is Not For Production or Extremely Large Data! This works great for a development cluster or personal dev cluster. You can easily backup files by ingesting with GetFile and other things can be backed up by called ExecuteStreamCommand. Local File Storage of Backed up Data drwxr-xr-x 3 tspann staff 102 May 20 23:00 any_data_trials2
drwxr-xr-x 3 tspann staff 102 May 20 22:59 any_data_meetup
drwxr-xr-x 3 tspann staff 102 May 20 22:59 any_data_ibeacon
drwxr-xr-x 3 tspann staff 102 May 20 22:57 any_data_gpsweather
drwxr-xr-x 3 tspann staff 102 May 20 10:53 any_data_beaconstatus
drwxr-xr-x 3 tspann staff 102 May 20 10:52 any_data_beacongateway
drwxr-xr-x 3 tspann staff 102 May 19 17:36 any_data_atweetshive2
drwxr-xr-x 3 tspann staff 102 May 19 17:31 any_data_atweetshive
Other Tools to Extract Data ShowTables to get your list and then you can grab all the DDL for the Hive tables. ddl.sql show create table atweetshive;
show create table atweetshive2;
show create table beacongateway;
show create table beaconstatus;
show create table dronedata;
show create table gps;
show create table gpsweather;
show create table ibeacon;
show create table meetup;
show create table trials2; Hive Script to Export Table DDL beeline -u jdbc:hive2://myhiveserverthrift:10000/default --color=false --showHeader=false --verbose=false --silent=true --outputformat=csv -f ddl.sql Backup Zeppelin Notebooks in Bulk tar -cvf notebooks.tar /usr/hdp/current/zeppelin-server/notebook/
gzip -9 notebooks.tar
scp userid@pservername:/opt/demo/notebooks.tar.gz .
... View more
Labels:
03-05-2018
03:51 PM
we are running inception see here: /tensorflow/models/tutorials/image/imagenet/classify_image.py
... View more