Created on 11-07-2018 02:58 PM - edited 09-16-2022 01:44 AM
Before being able to predict one's Beast Mode Quotient, we must find an easy way to ingest data from multiple different sources, in this case flat files and APIs. Luckily for everyone in the world, HDF, specifically Nifi is designed to handle multiple data inputs and outputs extremely efficiently, as I will detail in this tutorial.
A few notes about the choices of technologies selected for this tutorial and series of article:
Here are some high level concepts highlighted in this article, that you can re-apply to any implementation:
This tutorial is divided in the following sections:
Before being able to do anything, you will have to create a MySQL instance. There are a bunch of tutorials out there that explain you how to do it, but here is one for
centos 7 for instance.
Once you have this data setup connect as
root
and run the following database creation script:
DROP DATABASE IF EXISTS `BEAST_MODE_DB`; CREATE DATABASE `BEAST_MODE_DB`; CREATE USER 'bm_user'@'%' IDENTIFIED BY '[YOUR_PASSWORD]'; GRANT ALL ON BEAST_MODE_DB.* TO 'bm_user'@'%' IDENTIFIED BY '[YOUR_PASSWORD]';
Connect as bm_user and run this script to create the appropriate tables:
-- MySQL dump 10.14 Distrib 5.5.60-MariaDB, for Linux (x86_64) -- -- Host: localhost Database: BEAST_MODE_DB -- ------------------------------------------------------ -- Server version 5.5.60-MariaDB /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; /*!40101 SET NAMES utf8 */; /*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; /*!40103 SET TIME_ZONE='+00:00' */; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; USE BEAST_MODE_DB; -- -- Table structure for table `ACTIVITY_HISTORY` -- DROP TABLE IF EXISTS `ACTIVITY_HISTORY`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `ACTIVITY_HISTORY` ( `ID` bigint(20) NOT NULL, `START_TIME` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `DURATION` double DEFAULT NULL, `DISTANCE` double DEFAULT NULL, `ELEVATION_GAIN` double DEFAULT NULL, `AVG_HR` double DEFAULT NULL, `AVG_PACE` double DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Table structure for table `BMQ_HISTORY` -- DROP TABLE IF EXISTS `BMQ_HISTORY`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `BMQ_HISTORY` ( `ID` mediumint(9) NOT NULL AUTO_INCREMENT, `BMQ` int(11) NOT NULL, `TYPE` char(30) DEFAULT NULL, `TIME_ENTERED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Table structure for table `GENERATOR` -- DROP TABLE IF EXISTS `GENERATOR`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `GENERATOR` ( `ID` mediumint(9) NOT NULL AUTO_INCREMENT, `VALUE` varchar(50) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=latin1; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `GENERATOR` -- LOCK TABLES `GENERATOR` WRITE; /*!40000 ALTER TABLE `GENERATOR` DISABLE KEYS */; INSERT INTO `GENERATOR` VALUES (1,'dummy'),(2,'dummy'),(3,'dummy'),(4,'dummy'),(5,'dummy'),(6,'dummy'),(7,'dummy'),(8,'dummy'),(9,'dummy'),(10,'dummy'),(11,'dummy'),(12,'dummy'),(13,'dummy'),(14,'dummy'),(15,'dummy'),(16,'dummy'),(17,'dummy'),(18,'dummy'),(19,'dummy'),(20,'dummy'),(21,'dummy'),(22,'dummy'),(23,'dummy'),(24,'dummy'),(25,'dummy'),(26,'dummy'),(27,'dummy'),(28,'dummy'),(29,'dummy'),(30,'dummy'),(31,'dummy'),(32,'dummy'),(33,'dummy'),(34,'dummy'),(35,'dummy'),(36,'dummy'),(37,'dummy'),(38,'dummy'),(39,'dummy'),(40,'dummy'),(41,'dummy'),(42,'dummy'),(43,'dummy'),(44,'dummy'),(45,'dummy'),(46,'dummy'),(47,'dummy'),(48,'dummy'),(49,'dummy'),(50,'dummy'),(51,'dummy'),(52,'dummy'),(53,'dummy'),(54,'dummy'),(55,'dummy'),(56,'dummy'),(57,'dummy'),(58,'dummy'),(59,'dummy'),(60,'dummy'),(61,'dummy'),(62,'dummy'),(63,'dummy'),(64,'dummy'),(65,'dummy'),(66,'dummy'),(67,'dummy'),(68,'dummy'),(69,'dummy'),(70,'dummy'),(71,'dummy'),(72,'dummy'),(73,'dummy'),(74,'dummy'),(75,'dummy'),(76,'dummy'),(77,'dummy'),(78,'dummy'),(79,'dummy'),(80,'dummy'),(81,'dummy'),(82,'dummy'),(83,'dummy'),(84,'dummy'),(85,'dummy'),(86,'dummy'),(87,'dummy'),(88,'dummy'),(89,'dummy'),(90,'dummy'),(91,'dummy'),(92,'dummy'),(93,'dummy'),(94,'dummy'),(95,'dummy'),(96,'dummy'),(97,'dummy'),(98,'dummy'),(99,'dummy'),(100,'dummy'),(101,'dummy'),(102,'dummy'),(103,'dummy'),(104,'dummy'),(105,'dummy'),(106,'dummy'),(107,'dummy'),(108,'dummy'),(109,'dummy'),(110,'dummy'),(111,'dummy'),(112,'dummy'),(113,'dummy'),(114,'dummy'),(115,'dummy'),(116,'dummy'),(117,'dummy'),(118,'dummy'),(119,'dummy'),(120,'dummy'),(121,'dummy'),(122,'dummy'),(123,'dummy'),(124,'dummy'),(125,'dummy'),(126,'dummy'),(127,'dummy'),(128,'dummy'),(129,'dummy'),(130,'dummy'),(131,'dummy'),(132,'dummy'),(133,'dummy'),(134,'dummy'),(135,'dummy'),(136,'dummy'),(137,'dummy'),(138,'dummy'),(139,'dummy'),(140,'dummy'),(141,'dummy'),(142,'dummy'),(143,'dummy'),(144,'dummy'),(145,'dummy'),(146,'dummy'),(147,'dummy'),(148,'dummy'),(149,'dummy'),(150,'dummy'),(151,'dummy'),(152,'dummy'),(153,'dummy'),(154,'dummy'),(155,'dummy'),(156,'dummy'),(157,'dummy'),(158,'dummy'),(159,'dummy'),(160,'dummy'); /*!40000 ALTER TABLE `GENERATOR` ENABLE KEYS */; UNLOCK TABLES; -- -- Table structure for table `HEALTH_HISTORY` -- DROP TABLE IF EXISTS `HEALTH_HISTORY`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `HEALTH_HISTORY` ( `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `TOTAL_CALORIES_OUT` double NOT NULL, `TOTAL_MINUTES_RECORDED` double NOT NULL, `STEPS` double DEFAULT NULL, `ELEVATION` double DEFAULT NULL, `AVG_REST_HR` double DEFAULT NULL, `REST_MINUTES` double DEFAULT NULL, `REST_CAL_OUT` double DEFAULT NULL, `AVG_FAT_BURN_HR` double DEFAULT NULL, `FAT_BURN_MINUTES` double DEFAULT NULL, `FAT_BURN_CAL_OUT` double DEFAULT NULL, `AVG_CARDIO_HR` double DEFAULT NULL, `CARDIO_MINUTES` double DEFAULT NULL, `CARDIO_CAL_OUT` double DEFAULT NULL, `AVG_PEAK_HR` double DEFAULT NULL, `PEAK_MINUTES` double DEFAULT NULL, `PEAK_CAL_OUT` double DEFAULT NULL, PRIMARY KEY (`DIARY_DAY`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Table structure for table `NUTRITION_HISTORY` -- DROP TABLE IF EXISTS `NUTRITION_HISTORY`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `NUTRITION_HISTORY` ( `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `MEAL` varchar(50) NOT NULL, `TOTAL_CALORIES_IN` double NOT NULL, `CARB` double NOT NULL, `PROT` double NOT NULL, `FAT` double NOT NULL, PRIMARY KEY (`DIARY_DAY`,`MEAL`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Table structure for table `SLEEP_HISTORY` -- DROP TABLE IF EXISTS `SLEEP_HISTORY`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `SLEEP_HISTORY` ( `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `TOTAL_MINUTES_ASLEEP` double NOT NULL, `TOTAL_MINUTES_IN_BED` double NOT NULL, `REM_MINUTES` double NOT NULL, `LIGHT_MINUTES` double NOT NULL, `DEEP_MINUTES` double NOT NULL, `WAKE_MINUTES` double NOT NULL, PRIMARY KEY (`DIARY_DAY`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; /*!40101 SET character_set_client = @saved_cs_client */; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; /*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; -- Dump completed on 2018-11-07 0:14:54
You will note a GENERATOR table has been created. This is necessary to create date series queries, as detailed later.
To get your perceived BMQ, you will have to enter it manually. Before we develop the BMQ app to do that, you can enter it using the following example queries every day:
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values ('3', 'morning', '2018-11-06 7:00:00'); insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values ('3', 'pre-workout', '2018-11-06 7:50:00'); insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values ('4', 'post-workout', '2018-11-06 8:50:00'); insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values ('3', 'evening', now());
The goal is to setup the following flow:
This processor will monitor a folder for new files, based on timestamp, then maintain the last file read in its state, as seen below after right-clicking on the processor and selecting view state:
Here is a list of properties to configure that are not default:
[YOUR_LOCAL_PATH]
.*
12 h
This processor gets the file from the list generated previously and sends it in a flow file. It is configured using default properties.
This processor updates the schema name attribute (later used by next processors and controller services).
This controller service is pointing to your HDF Schema Registry. The only non-default property to configure in the controller service before enabling is:
http://[YOUR_SCHEMA_REGISTRY_HOST]:7788/api/v1/
This controller service is going to read your CSV and use your Hortonworks Schema Registry to parse its schema. The following non-default properties should be configured in the controller service before enabling is:
Use 'Schema Name' Property
HortonworksSchemaRegistry
true
true
This controller service is going to write your JSON output based on your Hortonworks Schema Registry. The following non-default properties should be configured in the controller service before enabling is:
Set 'schema.name' Attribute
Use 'Schema Name' Property
HortonworksSchemaRegistry
This processor does the record conversion. Configure it with your enabled controllers as such:
CSVReader
JsonRecordSetWriter
This processor splits each meal in your export in a flow file, by using the following non-default property:
$.[*]
This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:
flowfile-attribute
$.carbohydrates_g
$.date
$.fat_g
$.meal
$.protein_g
$.calories
This processor creates the query to be executed on your MySQL server to upsert the different lin. Here is the ReplacementValue you should use:
INSERT INTO NUTRITION_HISTORY (DIARY_DAY, MEAL, TOTAL_CALORIES_IN, CARB, PROT, FAT) VALUES ('${diary_day}', '${meal}', ${total_calories_in}, ${carb}, ${prot}, ${fat}) ON DUPLICATE KEY UPDATE TOTAL_CALORIES_IN = ${total_calories_in}, CARB = ${carb}, PROT = ${prot}, FAT = ${fat}
This controller service is enabling connection to your MySQL server, configured by the following non-default properties:
jdbc:mysql://[YOUR_MYSQL_SERVER_ADDRESS]:3306/BEAST_MODE_DB?useLegacyDatetimeCode=false&serverTimezone=America/New_York
com.mysql.jdbc.Driver
file:///home/nifi/mysql_jdbc_drivers/mysql-connector-java-8.0.11.jar
(or wherever you put your mysql jdbc driver)
This processor executes the query generated in the ReplaceText processor, and relies on your DBCPConnectionPool controller service, configured as such:
DBCPConnectionPool
The goal is to setup the following flow (separated in two screenshot, so you know it's going to be fun). It assumes that you registered an application with the Fitbit API and have a refresh/bearer token ready with the right permissions (see details over at Fitbit documentation).
This processor runs a query to get a series of dates not yet in the BMQ DB in order to run the Fitbit API. It is relying on the configuration of DBCPConnectionPool detailed above (configured in the property Database Connection Pooling Service, and a 12 h Run Schedule). Here is the query it should run:
select list_date from (select date_format( adddate((select IFNULL(max(hist.diary_day),'2018-11-07') from HEALTH_HISTORY hist), @num:=@num+1), '%Y-%m-%d' ) list_date from GENERATOR, (select @num:=-1) num limit 150) dates where list_date < CURRENT_DATE()
I realize this is not the only method to generate such a list (using GenerateTableFetch, etc.). This makes sure that I only run what the DB needs if something else changes the DB.
This processor converts the Avro response of the previous processor. Make sure you configure the following property to avoid splitting error when only one record is returned.
true
This processor splits each record in the query in a flow file, by using the following non-default property:
$.*
This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:
flowfile-attribute
$.list_date
This controller service is the server that will run the distributed cache. I should be configured using all default properties before being enabled.
This controller service is connecting to the DistributedMapCacheServer configured previously. Before enabling it, just configure the following:
[YOUR_HOSTNAME]
This processor will try and fetch a refresh token from the DistributedMapCacheServer. If not found, we will send it to an UpdateAttribute to set a default value (e.g. for the first run), otherwise we will send it to the InvokeHTTP to get a new token. Here are the non-default properties to be configured:
refresh-token-fitbit
DistributedMapCacheClientService
CurrentRefreshToken
A few notes: This configuration requests a refresh token every run, which works with my 12 h run schedule. Ideally, you would store the expiration in distributed cache as well and only request a refresh token then, but I didn't want to overcomplicate things. Moreover, in a production environment, I would probably recommend to persist these token on disk and not only in memory in case of failure.
This processor updates the CurrentRefreshToken if FetchDistributedMapCache returns not-found. Configure it using this property:
[A_VALID_REFRESH_TOKEN]
This calls the Fitbit API to get a new valid token and a refresh token. Configure the following non-default properties to run it:
POST
https://api.fitbit.com/oauth2/token?grant_type=refresh_token&refresh_token=${CurrentRefreshToken}
application/x-www-form-urlencoded
Basic [YOUR_AUTHORIZATION_CODE]
(see Fibit Documentation)
This processor extracts the bearer and refresh tokens from the HTTP response. It is configured using the following:
flowfile-attribute
$.access_token
$.refresh_token
This processor extracts the bearer and refresh tokens from the HTTP response, then continues to two routes: one storing back the refresh token in cache, the other one using the bearer token to call the Fitbit API. It is configured using the following:
flowfile-attribute
$.access_token
$.refresh_token
This processor puts the refresh token in flowfile in order to be consumed:
${REFRESH_TOKEN}
This processor takes the refresh token in the flowfile and stores it under refresh-token-fitbit for next time the flow is executed:
refresh-token-fitbit
DistributedMapCacheClientService
For this step, I will only detail one processor. First, because, as you can see on the screenshot above, both Get FitBit Daily Summary are very similar (they are just calling diffrent endpoints of the fitbit API). Secondly, because the flow Extract Relevant Attributes > Create SQL Query > Upsert to MySQL has been described above. Finally, because I will keep the Calculate Averages and Sums for the next flow I run in Strava. All detailed processors can be found here: feed-fitbit-anonymized.xml
This calls the Fitbit API to get a daily health summary:
GET
https://api.fitbit.com/1/user/-/activities/date/${DIARY_DAY}.json
Bearer ${BEARER_TOKEN}
As for the previous section, the goal is to setup a flow calling Strava's activity API. It assumes that you registered an application with the Strava API and have a refresh/bearer token ready with the right permissions (see details over at Strava documentation). As opposed with Fitbit the refresh token remains the same, so no need for distributed cache. I'm only going to detail the Convert Units processor to talk about Nifi language. The rest of the flow can be found here: feed-strava-anonymized.xml
This processor uses some of the data extracted to convert units before putting them into attributes later on used in your upsert query. Here are the calculation executed:
${AVG_HR:isEmpty():ifElse(0.0, ${AVG_HR})}
(makes sure that we have a value if the API does not return HR, in the case of HR not present in activity)${MILE_IN_METERS:divide(${AVG_PACE:toDecimal():multiply(60.0)})}
(converts speed from m/s to min/mile)${DISTANCE:divide(${MILE_IN_METERS})}
(converts distance from meters to miles)${DURATION:toDecimal():divide(60.0)}
(converts duration from seconds to mins)${START_TIME:replaceAll("T"," "):replaceAll("Z","")}
(converts timestamp format to MySQL format)