Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

See Part 1: https://community.hortonworks.com/articles/101679/iot-ingesting-gps-data-from-raspberry-pi-zero-wire...

Augment Your Streaming Ingested Data

So with Hortonworks Data Flow : Flow Processing I can augment any stream of data in motion. My GPS data stream from my RPIWZ is returning latitude and longitude. That is valuable data that can be input to a lot of different APIs. One that seemed relevant to me was the current weather conditions where the device was. You could also do some predictive analytics to figure where you might be and check the current weather conditions or a weather forecast depending on how long it will take you to get there.

A GPS is giving us latitude and longitude that can be used as input to many kinds of APIs. Once use case that works for a moving device is checking the current weather conditions where you are. Weather Underground has a REST API call that will convert Lat/Long to City/State (there are other APIs that will do this as well, please post them). Once I have City and State I use that to get current weather conditions where this device is.

Apache NiFi Flow

15239-weathercoding.png

Augment Live Data

EvaluateJsonPath : create two variables (latitude and longitude) by extracting JSON fields from the flow file sent via MQTT. $.latitude and $.longitude

InvokeHTTP: http://api.wunderground.com/api/TheOneKey/geolookup/q/${latitude},${longitude}.json

EvaluateJsonPath: create two variables (city and state) by extracting JSON fields from the flow file sent from REST call. $.location.city $.location.state.

InvokeHTTP: http://api.wunderground.com/api/OneKeyToRule/conditions/q/${state}/${city:urlEncode()}.json City has space and perhaps other HTTP GET unfriendly data.

EvaluateJsonPath : extract all the weather fields I am interested in such as $.current_observation.dewpoint_string.

UpdateAttribute: A few field names Avro Schema's do not like. ${'Last-Modified'} I convert that to lastModified.

AttributesToJSON: Convert just the fields I want to a new flow file. Set property to flowfile-content.

temp_c,temp_f,wind_mph,wind_degrees,temperature_string,weather,feelslike_string,state,longitude,windchill_string,wind_string,relative_humidity,pressure_mb,observation_time,city,windchill_f,windchill_c,latitude,precip_today_string,lastModified,dewpoint_string,heat_index_string,visibility_mi

InferAvroSchema : Let Nifi decide what this AVRO record should look like. I will add a part three using the Schema Registry and Apache NiFi 1.2 that will use a schema lookup.

15254-inferavroschema.png

ConvertJSONtoAvro : And make it SNAPPY! MergeContent: As AVRO.

15253-convertavrotojson.png

MergeContent: As AVRO.

15255-mergecontent.png

ConvertAvroToORC : ORC is the optimal choice for Hive LLAP and Spark SQL to query the data.

15251-convertavrotoorc.png

PutHDFS : Point to your configuration Nifi relative file system file /etc/hadoop/conf/core-site.xml. I like to set for replace conflict resolution and set your directory. Make sure you created your file directory and Nifi has permissions to write there.

The quickest is to log into a machine with HDFS client.

su hdfs
hdfs dfs -mkdir -p /rpwz/gpsweather   
hdfs dfs -chmod -R 777 /rpwz

Enhancement Ideas:

Limit the # of calls to Weather Underground, try other weather services like NOAA and Weather Source. You may want to try other nation's weather APIs as well.

Create an External Hive Table and Display the Data (Zeppelin Can Do That)

15240-weatherzep.png

Zeppelin let's me create the table with the DDL produced by the stream.

Then I can query it easily.

%jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS gpsweather (observation_time STRING, dewpoint_string STRING, city STRING, windchill_f STRING, windchill_c STRING, latitude STRING, precip_today_string STRING, temp_c STRING, temp_f STRING, windchill_string STRING, wind_mph STRING, wind_degrees STRING, temperature_string STRING, weather STRING, feelslike_string STRING, wind_string STRING, heat_index_string STRING, state STRING, lastModified STRING, relative_humidity STRING, pressure_mb STRING, visibility_mi STRING, longitude STRING) 
STORED AS ORC LOCATION '/rpwz/gpsweather'
%jdbc(hive)
select * from gpsweather

Reference:

http://www.wunderground.com/

https://openweathermap.org/forecast5

https://developers.google.com/maps/

Note:

Weather Underground has a free API that can use up to a certain # of calls. You will easily blast past that if you are tracking live objects, even if you decide to only enrich when points change or at 15 minute intervals. For testing, you may want to store weather underground results in JSON files in a directory and use GetFile to stand in for those calls. The same with data from your devices. Download the files from the Data Provenance and you can use them as stand-ins for live data for integration testing, especially when you may be on a plane or somewhere where you cannot reach your device.


convertavrotojson.png
3,224 Views
Comments
avatar
Rising Star

Hi @Timothy Spann,

I am doing the same but unable to figure out this problem. Could you share me demo flow for this project.

Thank you.

avatar

i too wanted it

Version history
Last update:
‎08-17-2019 01:02 PM
Updated by:
Contributors