Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Cloudera Employee

Introduction

Before being able to predict one's Beast Mode Quotient, we must find an easy way to ingest data from multiple different sources, in this case flat files and APIs. Luckily for everyone in the world, HDF, specifically Nifi is designed to handle multiple data inputs and outputs extremely efficiently, as I will detail in this tutorial.

A few notes about the choices of technologies selected for this tutorial and series of article:

  • Target - MySQL: I selected MySQL because this is what my BMQ app is going to use, but it could be any target depending on your use-case.
  • Sources - Fitbit, Strava, MyFitnessPal: There are a bunch of options out there to track health. Fitbit and Strava are the only ones offering a comprehensive and public API (looking at you Garmin, and Apple Health; both have no public APIs, and make it not easy to extract data via file, either having to parse .fit files or only exporting the full set of data); as for MyFitnessPal, I could have used Fitbit API (after sync with MFP) to get Calories data, but I figured it would be nice to demonstrate file ingestion.

Notions you will learn in this tutorial

Here are some high level concepts highlighted in this article, that you can re-apply to any implementation:

  • Running MySQL queries including create, upsert and selection of date series
  • Using Nifi processors maintaining state
  • Configuring schema registry for file format conversion in Nifi (e.g CSV > JSON)
  • Using Nifi to call OAuth 2 APIs (using refresh token, bearer tokens, etc.)
  • Implementing and using a distributed map cache in Nifi
  • Parsing and splitting JSONs in Nifi
  • Implementing simple math operations using Nifi language

Agenda

This tutorial is divided in the following sections:

  • Section 1: Setting up a MySQL DB to host health and fitness data
  • Section 2: Creating a Nifi flow to consume MyFitnessPal CSV exports
  • Section 3: Creating a Nifi flow to consume Fitbit health & sleep data
  • Section 4: Creating a Nifi flow to consume Strava activity data

Section 1: Setting up a MySQL DB to host health and fitness data

Step 1: Create the MySQL DB and users

Before being able to do anything, you will have to create a MySQL instance. There are a bunch of tutorials out there that explain you how to do it, but here is one for centos 7 for instance. Once you have this data setup connect as root and run the following database creation script:

DROP DATABASE IF EXISTS `BEAST_MODE_DB`;
CREATE DATABASE `BEAST_MODE_DB`;
CREATE USER 'bm_user'@'%' IDENTIFIED BY '[YOUR_PASSWORD]';
GRANT ALL ON BEAST_MODE_DB.* TO 'bm_user'@'%'  IDENTIFIED BY '[YOUR_PASSWORD]';

Step 2: Create the DB tables

Connect as bm_user and run this script to create the appropriate tables:

-- MySQL dump 10.14  Distrib 5.5.60-MariaDB, for Linux (x86_64)
--
-- Host: localhost    Database: BEAST_MODE_DB
-- ------------------------------------------------------
-- Server version 5.5.60-MariaDB
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
USE BEAST_MODE_DB;
--
-- Table structure for table `ACTIVITY_HISTORY`
--
DROP TABLE IF EXISTS `ACTIVITY_HISTORY`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `ACTIVITY_HISTORY` (
  `ID` bigint(20) NOT NULL,
  `START_TIME` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `DURATION` double DEFAULT NULL,
  `DISTANCE` double DEFAULT NULL,
  `ELEVATION_GAIN` double DEFAULT NULL,
  `AVG_HR` double DEFAULT NULL,
  `AVG_PACE` double DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `BMQ_HISTORY`
--
DROP TABLE IF EXISTS `BMQ_HISTORY`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `BMQ_HISTORY` (
  `ID` mediumint(9) NOT NULL AUTO_INCREMENT,
  `BMQ` int(11) NOT NULL,
  `TYPE` char(30) DEFAULT NULL,
  `TIME_ENTERED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `GENERATOR`
--
DROP TABLE IF EXISTS `GENERATOR`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `GENERATOR` (
  `ID` mediumint(9) NOT NULL AUTO_INCREMENT,
  `VALUE` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `GENERATOR`
--
LOCK TABLES `GENERATOR` WRITE;
/*!40000 ALTER TABLE `GENERATOR` DISABLE KEYS */;
INSERT INTO `GENERATOR` VALUES (1,'dummy'),(2,'dummy'),(3,'dummy'),(4,'dummy'),(5,'dummy'),(6,'dummy'),(7,'dummy'),(8,'dummy'),(9,'dummy'),(10,'dummy'),(11,'dummy'),(12,'dummy'),(13,'dummy'),(14,'dummy'),(15,'dummy'),(16,'dummy'),(17,'dummy'),(18,'dummy'),(19,'dummy'),(20,'dummy'),(21,'dummy'),(22,'dummy'),(23,'dummy'),(24,'dummy'),(25,'dummy'),(26,'dummy'),(27,'dummy'),(28,'dummy'),(29,'dummy'),(30,'dummy'),(31,'dummy'),(32,'dummy'),(33,'dummy'),(34,'dummy'),(35,'dummy'),(36,'dummy'),(37,'dummy'),(38,'dummy'),(39,'dummy'),(40,'dummy'),(41,'dummy'),(42,'dummy'),(43,'dummy'),(44,'dummy'),(45,'dummy'),(46,'dummy'),(47,'dummy'),(48,'dummy'),(49,'dummy'),(50,'dummy'),(51,'dummy'),(52,'dummy'),(53,'dummy'),(54,'dummy'),(55,'dummy'),(56,'dummy'),(57,'dummy'),(58,'dummy'),(59,'dummy'),(60,'dummy'),(61,'dummy'),(62,'dummy'),(63,'dummy'),(64,'dummy'),(65,'dummy'),(66,'dummy'),(67,'dummy'),(68,'dummy'),(69,'dummy'),(70,'dummy'),(71,'dummy'),(72,'dummy'),(73,'dummy'),(74,'dummy'),(75,'dummy'),(76,'dummy'),(77,'dummy'),(78,'dummy'),(79,'dummy'),(80,'dummy'),(81,'dummy'),(82,'dummy'),(83,'dummy'),(84,'dummy'),(85,'dummy'),(86,'dummy'),(87,'dummy'),(88,'dummy'),(89,'dummy'),(90,'dummy'),(91,'dummy'),(92,'dummy'),(93,'dummy'),(94,'dummy'),(95,'dummy'),(96,'dummy'),(97,'dummy'),(98,'dummy'),(99,'dummy'),(100,'dummy'),(101,'dummy'),(102,'dummy'),(103,'dummy'),(104,'dummy'),(105,'dummy'),(106,'dummy'),(107,'dummy'),(108,'dummy'),(109,'dummy'),(110,'dummy'),(111,'dummy'),(112,'dummy'),(113,'dummy'),(114,'dummy'),(115,'dummy'),(116,'dummy'),(117,'dummy'),(118,'dummy'),(119,'dummy'),(120,'dummy'),(121,'dummy'),(122,'dummy'),(123,'dummy'),(124,'dummy'),(125,'dummy'),(126,'dummy'),(127,'dummy'),(128,'dummy'),(129,'dummy'),(130,'dummy'),(131,'dummy'),(132,'dummy'),(133,'dummy'),(134,'dummy'),(135,'dummy'),(136,'dummy'),(137,'dummy'),(138,'dummy'),(139,'dummy'),(140,'dummy'),(141,'dummy'),(142,'dummy'),(143,'dummy'),(144,'dummy'),(145,'dummy'),(146,'dummy'),(147,'dummy'),(148,'dummy'),(149,'dummy'),(150,'dummy'),(151,'dummy'),(152,'dummy'),(153,'dummy'),(154,'dummy'),(155,'dummy'),(156,'dummy'),(157,'dummy'),(158,'dummy'),(159,'dummy'),(160,'dummy');
/*!40000 ALTER TABLE `GENERATOR` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `HEALTH_HISTORY`
--
DROP TABLE IF EXISTS `HEALTH_HISTORY`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `HEALTH_HISTORY` (
  `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `TOTAL_CALORIES_OUT` double NOT NULL,
  `TOTAL_MINUTES_RECORDED` double NOT NULL,
  `STEPS` double DEFAULT NULL,
  `ELEVATION` double DEFAULT NULL,
  `AVG_REST_HR` double DEFAULT NULL,
  `REST_MINUTES` double DEFAULT NULL,
  `REST_CAL_OUT` double DEFAULT NULL,
  `AVG_FAT_BURN_HR` double DEFAULT NULL,
  `FAT_BURN_MINUTES` double DEFAULT NULL,
  `FAT_BURN_CAL_OUT` double DEFAULT NULL,
  `AVG_CARDIO_HR` double DEFAULT NULL,
  `CARDIO_MINUTES` double DEFAULT NULL,
  `CARDIO_CAL_OUT` double DEFAULT NULL,
  `AVG_PEAK_HR` double DEFAULT NULL,
  `PEAK_MINUTES` double DEFAULT NULL,
  `PEAK_CAL_OUT` double DEFAULT NULL,
  PRIMARY KEY (`DIARY_DAY`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `NUTRITION_HISTORY`
--
DROP TABLE IF EXISTS `NUTRITION_HISTORY`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `NUTRITION_HISTORY` (
  `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `MEAL` varchar(50) NOT NULL,
  `TOTAL_CALORIES_IN` double NOT NULL,
  `CARB` double NOT NULL,
  `PROT` double NOT NULL,
  `FAT` double NOT NULL,
  PRIMARY KEY (`DIARY_DAY`,`MEAL`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `SLEEP_HISTORY`
--
DROP TABLE IF EXISTS `SLEEP_HISTORY`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `SLEEP_HISTORY` (
  `DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `TOTAL_MINUTES_ASLEEP` double NOT NULL,
  `TOTAL_MINUTES_IN_BED` double NOT NULL,
  `REM_MINUTES` double NOT NULL,
  `LIGHT_MINUTES` double NOT NULL,
  `DEEP_MINUTES` double NOT NULL,
  `WAKE_MINUTES` double NOT NULL,
  PRIMARY KEY (`DIARY_DAY`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2018-11-07  0:14:54

You will note a GENERATOR table has been created. This is necessary to create date series queries, as detailed later.

Step 3: Update your BMQ every day

To get your perceived BMQ, you will have to enter it manually. Before we develop the BMQ app to do that, you can enter it using the following example queries every day:

insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values 
('3', 'morning', '2018-11-06 7:00:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values 
('3', 'pre-workout', '2018-11-06 7:50:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values 
('4', 'post-workout', '2018-11-06 8:50:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values 
('3', 'evening', now());

Section 2: Creating a Nifi flow to consume MyFitnessPal CSV exports

The goal is to setup the following flow:

93201-screen-shot-2018-11-07-at-105113-am.png

Step 1: Survey folder for new CSV Files

List Files in MFP Folder: ListFile Processor

93203-screen-shot-2018-11-07-at-110136-am.png

This processor will monitor a folder for new files, based on timestamp, then maintain the last file read in its state, as seen below after right-clicking on the processor and selecting view state:

93202-screen-shot-2018-11-07-at-105434-am.png

Here is a list of properties to configure that are not default:

  • Input directory: [YOUR_LOCAL_PATH]
  • File filter: .*
  • Run Schedule: 12 h

Get Latest File: FetchFile Processor

93204-screen-shot-2018-11-07-at-110332-am.png

This processor gets the file from the list generated previously and sends it in a flow file. It is configured using default properties.

Step 2: Convert CSV to JSON

Update Schema Name: UpdateAttribute Processor

93205-screen-shot-2018-11-07-at-110316-am.png

This processor updates the schema name attribute (later used by next processors and controller services).

HortonworksSchemaRegistry Controller Service

93206-screen-shot-2018-11-07-at-111010-am.png

This controller service is pointing to your HDF Schema Registry. The only non-default property to configure in the controller service before enabling is:

  • Schema Registry URL: http://[YOUR_SCHEMA_REGISTRY_HOST]:7788/api/v1/

CSVReader Controller Service

93207-screen-shot-2018-11-07-at-111554-am.png

This controller service is going to read your CSV and use your Hortonworks Schema Registry to parse its schema. The following non-default properties should be configured in the controller service before enabling is:

  • Schema Access Strategy: Use 'Schema Name' Property
  • Schema Registry: HortonworksSchemaRegistry
  • Treat First Line as Header: true
  • Ignore CSV Header Column Names: true

JsonRecordSetWriter Controller Service

93208-screen-shot-2018-11-07-at-111840-am.png

This controller service is going to write your JSON output based on your Hortonworks Schema Registry. The following non-default properties should be configured in the controller service before enabling is:

  • Schema Write Strategy: Set 'schema.name' Attribute
  • Schema Access Strategy: Use 'Schema Name' Property
  • Schema Registry: HortonworksSchemaRegistry

Convert CSV to JSON: ConvertRecord Processor

93209-screen-shot-2018-11-07-at-112131-am.png

This processor does the record conversion. Configure it with your enabled controllers as such:

  • Record Reader: CSVReader
  • Record Writer: JsonRecordSetWriter

Step 3: Get fields needed for insert

SplitJson Processor

93210-screen-shot-2018-11-07-at-112656-am.png

This processor splits each meal in your export in a flow file, by using the following non-default property:

  • JsonPath Expression: $.[*]

Extract Relevant Attributes: EvaluateJsonPath Processor

93211-screen-shot-2018-11-07-at-112835-am.png

This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:

  • Destination: flowfile-attribute
  • carb: $.carbohydrates_g
  • diary_day: $.date
  • fat: $.fat_g
  • meal: $.meal
  • prot: $.protein_g
  • total_calories_in: $.calories

Step 4: Insert Data to MySQL

Create Upsert Query: ReplaceText Processor

93212-screen-shot-2018-11-07-at-113751-am.png

This processor creates the query to be executed on your MySQL server to upsert the different lin. Here is the ReplacementValue you should use:

INSERT INTO NUTRITION_HISTORY
    (DIARY_DAY, MEAL, TOTAL_CALORIES_IN, CARB, PROT, FAT)
VALUES
    ('${diary_day}', '${meal}', ${total_calories_in}, ${carb}, ${prot}, ${fat}) ON DUPLICATE KEY UPDATE
    TOTAL_CALORIES_IN = ${total_calories_in},
    CARB = ${carb},
    PROT = ${prot},
    FAT = ${fat}

DBCPConnectionPool Controller Service

93214-screen-shot-2018-11-07-at-114007-am.png

This controller service is enabling connection to your MySQL server, configured by the following non-default properties:

  • Database Connection URL: jdbc:mysql://[YOUR_MYSQL_SERVER_ADDRESS]:3306/BEAST_MODE_DB?useLegacyDatetimeCode=false&serverTimezone=America/New_York
  • Database Driver Class Name: com.mysql.jdbc.Driver
  • Database Driver Location(s): file:///home/nifi/mysql_jdbc_drivers/mysql-connector-java-8.0.11.jar (or wherever you put your mysql jdbc driver)

Upsert into MySQL: PutSQL Processor

93213-screen-shot-2018-11-07-at-114234-am.png

This processor executes the query generated in the ReplaceText processor, and relies on your DBCPConnectionPool controller service, configured as such:

  • JDBC Connection Pool: DBCPConnectionPool

Section 3: Creating a Nifi flow to consume Fitbit health & sleep data

The goal is to setup the following flow (separated in two screenshot, so you know it's going to be fun). It assumes that you registered an application with the Fitbit API and have a refresh/bearer token ready with the right permissions (see details over at Fitbit documentation).

93216-screen-shot-2018-11-07-at-123054-pm.png

93217-screen-shot-2018-11-07-at-123104-pm.png

Step 1: Get Delta parameters

Get Latest Date: ExecuteSQL Processor

93218-screen-shot-2018-11-07-at-123540-pm.png

This processor runs a query to get a series of dates not yet in the BMQ DB in order to run the Fitbit API. It is relying on the configuration of DBCPConnectionPool detailed above (configured in the property Database Connection Pooling Service, and a 12 h Run Schedule). Here is the query it should run:

 select list_date from (select 
    date_format(
        adddate((select IFNULL(max(hist.diary_day),'2018-11-07') from HEALTH_HISTORY hist), @num:=@num+1), 
        '%Y-%m-%d'
    ) list_date
from 
    GENERATOR,    
    (select @num:=-1) num
limit 
    150) dates
   where list_date < CURRENT_DATE()

I realize this is not the only method to generate such a list (using GenerateTableFetch, etc.). This makes sure that I only run what the DB needs if something else changes the DB.

ConvertAvroToJSON Processor

93219-screen-shot-2018-11-07-at-125439-pm.png

This processor converts the Avro response of the previous processor. Make sure you configure the following property to avoid splitting error when only one record is returned.

  • Wrap Single Record: true

SplitJson Processor

93220-screen-shot-2018-11-07-at-10155-pm.png

This processor splits each record in the query in a flow file, by using the following non-default property:

  • JsonPath Expression: $.*

Extract Relevant Attributes: EvaluateJsonPath Processor

93221-screen-shot-2018-11-07-at-125919-pm.png

This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:

  • Destination: flowfile-attribute
  • DIARY_DAY: $.list_date

Step 2: Handle Token Refresh

DistributedMapCacheServer Controller Service

93222-screen-shot-2018-11-07-at-10738-pm.png

This controller service is the server that will run the distributed cache. I should be configured using all default properties before being enabled.

DistributedMapCacheClientService Controller Service

93223-screen-shot-2018-11-07-at-10846-pm.png

This controller service is connecting to the DistributedMapCacheServer configured previously. Before enabling it, just configure the following:

  • Server Hostname: [YOUR_HOSTNAME]

Get Token From Cache: FetchDistributedMapCache Processor

93224-screen-shot-2018-11-07-at-11021-pm.png

This processor will try and fetch a refresh token from the DistributedMapCacheServer. If not found, we will send it to an UpdateAttribute to set a default value (e.g. for the first run), otherwise we will send it to the InvokeHTTP to get a new token. Here are the non-default properties to be configured:

  • Cache Entry Identifier: refresh-token-fitbit
  • Distributed Cache Service: DistributedMapCacheClientService
  • Put Cache Value In Attribute: CurrentRefreshToken

A few notes: This configuration requests a refresh token every run, which works with my 12 h run schedule. Ideally, you would store the expiration in distributed cache as well and only request a refresh token then, but I didn't want to overcomplicate things. Moreover, in a production environment, I would probably recommend to persist these token on disk and not only in memory in case of failure.

Use Default Value if Not Found: UpdateAttribute Processor

93225-screen-shot-2018-11-07-at-12216-pm.png

This processor updates the CurrentRefreshToken if FetchDistributedMapCache returns not-found. Configure it using this property:

  • CurrentRefreshToken: [A_VALID_REFRESH_TOKEN]

Refresh Fitbit Token: InvokeHTTP Processor

93226-screen-shot-2018-11-07-at-12823-pm.png

This calls the Fitbit API to get a new valid token and a refresh token. Configure the following non-default properties to run it:

Basic [YOUR_AUTHORIZATION_CODE] (see Fibit Documentation)

Extract Token: EvaluateJsonPath Processor

This processor extracts the bearer and refresh tokens from the HTTP response. It is configured using the following:

  • Destination: flowfile-attribute
  • carb: $.access_token
  • diary_day: $.refresh_token

Extract Token: EvaluateJsonPath Processor

This processor extracts the bearer and refresh tokens from the HTTP response, then continues to two routes: one storing back the refresh token in cache, the other one using the bearer token to call the Fitbit API. It is configured using the following:

  • Destination: flowfile-attribute
  • carb: $.access_token
  • diary_day: $.refresh_token

Put Refresh Token in Flowfile: ReplaceText Processor

This processor puts the refresh token in flowfile in order to be consumed: ${REFRESH_TOKEN}

PutDistributedMapCache Processor

93227-screen-shot-2018-11-07-at-15606-pm.png

This processor takes the refresh token in the flowfile and stores it under refresh-token-fitbit for next time the flow is executed:

  • Cache Entry Identifier: refresh-token-fitbit
  • Distributed Cache Service: DistributedMapCacheClientService

Step 3: Call Fitbit APIs (Health and Sleep)

93217-screen-shot-2018-11-07-at-123104-pm.png

For this step, I will only detail one processor. First, because, as you can see on the screenshot above, both Get FitBit Daily Summary are very similar (they are just calling diffrent endpoints of the fitbit API). Secondly, because the flow Extract Relevant Attributes > Create SQL Query > Upsert to MySQL has been described above. Finally, because I will keep the Calculate Averages and Sums for the next flow I run in Strava. All detailed processors can be found here: feed-fitbit-anonymized.xml

Get FitBit Daily Summary: InvokeHTTP Processor

This calls the Fitbit API to get a daily health summary:

Section 4: Creating a Nifi flow to consume Strava activity data

As for the previous section, the goal is to setup a flow calling Strava's activity API. It assumes that you registered an application with the Strava API and have a refresh/bearer token ready with the right permissions (see details over at Strava documentation). As opposed with Fitbit the refresh token remains the same, so no need for distributed cache. I'm only going to detail the Convert Units processor to talk about Nifi language. The rest of the flow can be found here: feed-strava-anonymized.xml

93230-screen-shot-2018-11-07-at-21507-pm.png

Convert Units: UpdateAttribute Processor

93231-screen-shot-2018-11-07-at-22020-pm.png

This processor uses some of the data extracted to convert units before putting them into attributes later on used in your upsert query. Here are the calculation executed:

  • AVG_HR: ${AVG_HR:isEmpty():ifElse(0.0, ${AVG_HR})} (makes sure that we have a value if the API does not return HR, in the case of HR not present in activity)
  • AVG_PACE: ${MILE_IN_METERS:divide(${AVG_PACE:toDecimal():multiply(60.0)})} (converts speed from m/s to min/mile)
  • DISTANCE: ${DISTANCE:divide(${MILE_IN_METERS})} (converts distance from meters to miles)
  • DURATION: ${DURATION:toDecimal():divide(60.0)} (converts duration from seconds to mins)
  • START_TIME: ${START_TIME:replaceAll("T"," "):replaceAll("Z","")} (converts timestamp format to MySQL format)
1,109 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 05:38 AM
Updated by:
 
Contributors
Top Kudoed Authors