Member since
07-10-2018
63
Posts
71
Kudos Received
0
Solutions
12-10-2018
03:07 PM
11 Kudos
Introduction
Cloudbreak documentation is fairly thorough and can be found on Hortonworks' documentation portal (e.g.
here). However, if you are as lazy as I am, you'll appreciate the few quick examples I'm listing in this article.
First and foremost, you can access the documentation of available APIs on your CB instance by using the following URL
https://[YOUR_CB_URL]/cb/apidocs ; you should see something like this:
Authenticating
Authentication in cloudbreak is OAuth 2, and getting a detailed in the Cloudbreak API.
Here is simple curl call to get a token:
curl -k -iX POST -H "accept: application/x-www-form-urlencoded" -d 'credentials={"username":"[YOUR_USER]","password":"[YOUR_PWD]"}' "https://[YOUR_CB_URL]/identity/oauth/authorize?response_type=token&client_id=cloudbreak_shell&scope.0=openid&source=login&redirect_uri=http://cloudbreak.shell" | grep location | cut -d'=' -f 3 | cut -d'&' -f 1
GET Example: Listing all blueprints
The biggest trick to the CB API is to know the URL to use and to remember to disable SSL Certificate Verification (using the -k in curl).
Here is an example call that lists all blueprints:
TOKEN=$(curl -k -iX POST -H "accept: application/x-www-form-urlencoded" -d 'credentials={"username":"[YOUR_USER]","password":"[YOUR_PWD]"}' "https://[YOUR_CB_URL]/identity/oauth/authorize?response_type=token&client_id=cloudbreak_shell&scope.0=openid&source=login&redirect_uri=http://cloudbreak.shell" | grep location | cut -d'=' -f 3 | cut -d'&' -f 1)
curl -X GET https://YOUR_CB_URL]/cb/api/v1/blueprints/account -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -H 'cache-control: no-cache' -k
POST Example: Adding an mpack
TOKEN=$(curl -k -iX POST -H "accept: application/x-www-form-urlencoded" -d 'credentials={"username":"[YOUR_USER]","password":"[YOUR_PWD]"}' "https://[YOUR_CB_URL]/identity/oauth/authorize?response_type=token&client_id=cloudbreak_shell&scope.0=openid&source=login&redirect_uri=http://cloudbreak.shell" | grep location | cut -d'=' -f 3 | cut -d'&' -f 1)
curl -X POST https://[YOUR_CB_URL]/cb/api/v1/mpacks/account -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -H 'cache-control: no-cache' -d '{
"name": "hdf-3-2-aws",
"description": "HDF 3.2 Mangement Pack for AWS",
"mpackUrl": "http://public-repo-1.hortonworks.com/HDF/amazonlinux2/3.x/updates/3.2.0.0/tars/hdf_ambari_mp/hdf-ambari-mpack-3.2.0.0-520.tar.gz",
"purge": false,
"force": false
}' -k
POST Example: Uploading a user blueprint
To upload your own blueprints, remember to encode your blueprint in base64
TOKEN=$(curl -k -iX POST -H "accept: application/x-www-form-urlencoded" -d 'credentials={"username":"[YOUR_USER]","password":"[YOUR_PWD]"}' "https://[YOUR_CB_URL]/identity/oauth/authorize?response_type=token&client_id=cloudbreak_shell&scope.0=openid&source=login&redirect_uri=http://cloudbreak.shell" | grep location | cut -d'=' -f 3 | cut -d'&' -f 1)
ENCODED_BLUEPRINT=$(base64 [YOUR_BP_JSON_LOCATION])
curl -X POST https://[YOUR_CB_URL]/cb/api/v1/blueprints/user -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -H 'cache-control: no-cache' -d " {
\"ambariBlueprint\": \"$ENCODED_BLUEPRINT\",
\"description\": \"Blueprint for HWX Data Science Uploaded\",
\"inputs\": [],
\"tags\": {},
\"name\": \"data-science-workshop-upload\",
\"hostGroupCount\": 1,
\"status\": \"USER_MANAGED\",
\"public\": true
}" -k
... View more
Labels:
11-07-2018
02:58 PM
2 Kudos
Introduction
Before being able to predict one's Beast Mode Quotient, we must find an easy way to ingest data from multiple different sources, in this case flat files and APIs. Luckily for everyone in the world, HDF, specifically Nifi is designed to handle multiple data inputs and outputs extremely efficiently, as I will detail in this tutorial.
A few notes about the choices of technologies selected for this tutorial and series of article:
Target - MySQL: I selected MySQL because this is what my BMQ app is going to use, but it could be any target depending on your use-case.
Sources - Fitbit, Strava, MyFitnessPal: There are a bunch of options out there to track health. Fitbit and Strava are the only ones offering a comprehensive and public API (looking at you Garmin, and Apple Health; both have no public APIs, and make it not easy to extract data via file, either having to parse .fit files or only exporting the full set of data); as for MyFitnessPal, I could have used Fitbit API (after sync with MFP) to get Calories data, but I figured it would be nice to demonstrate file ingestion. Notions you will learn in this tutorial
Here are some high level concepts highlighted in this article, that you can re-apply to any implementation:
Running MySQL queries including create, upsert and selection of date series
Using Nifi processors maintaining state
Configuring schema registry for file format conversion in Nifi (e.g CSV > JSON)
Using Nifi to call OAuth 2 APIs (using refresh token, bearer tokens, etc.)
Implementing and using a distributed map cache in Nifi
Parsing and splitting JSONs in Nifi
Implementing simple math operations using Nifi language Agenda
This tutorial is divided in the following sections:
Section 1: Setting up a MySQL DB to host health and fitness data
Section 2: Creating a Nifi flow to consume MyFitnessPal CSV exports
Section 3: Creating a Nifi flow to consume Fitbit health & sleep data
Section 4: Creating a Nifi flow to consume Strava activity data Section 1: Setting up a MySQL DB to host health and fitness data Step 1: Create the MySQL DB and users
Before being able to do anything, you will have to create a MySQL instance. There are a bunch of tutorials out there that explain you how to do it, but here is one for
centos 7 for instance.
Once you have this data setup connect as
root and run the following database creation script:
DROP DATABASE IF EXISTS `BEAST_MODE_DB`;
CREATE DATABASE `BEAST_MODE_DB`;
CREATE USER 'bm_user'@'%' IDENTIFIED BY '[YOUR_PASSWORD]';
GRANT ALL ON BEAST_MODE_DB.* TO 'bm_user'@'%' IDENTIFIED BY '[YOUR_PASSWORD]';
Step 2: Create the DB tables
Connect as bm_user and run this script to create the appropriate tables:
-- MySQL dump 10.14 Distrib 5.5.60-MariaDB, for Linux (x86_64)
--
-- Host: localhost Database: BEAST_MODE_DB
-- ------------------------------------------------------
-- Server version 5.5.60-MariaDB
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
USE BEAST_MODE_DB;
--
-- Table structure for table `ACTIVITY_HISTORY`
--
DROP TABLE IF EXISTS `ACTIVITY_HISTORY`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `ACTIVITY_HISTORY` (
`ID` bigint(20) NOT NULL,
`START_TIME` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`DURATION` double DEFAULT NULL,
`DISTANCE` double DEFAULT NULL,
`ELEVATION_GAIN` double DEFAULT NULL,
`AVG_HR` double DEFAULT NULL,
`AVG_PACE` double DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `BMQ_HISTORY`
--
DROP TABLE IF EXISTS `BMQ_HISTORY`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `BMQ_HISTORY` (
`ID` mediumint(9) NOT NULL AUTO_INCREMENT,
`BMQ` int(11) NOT NULL,
`TYPE` char(30) DEFAULT NULL,
`TIME_ENTERED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `GENERATOR`
--
DROP TABLE IF EXISTS `GENERATOR`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `GENERATOR` (
`ID` mediumint(9) NOT NULL AUTO_INCREMENT,
`VALUE` varchar(50) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `GENERATOR`
--
LOCK TABLES `GENERATOR` WRITE;
/*!40000 ALTER TABLE `GENERATOR` DISABLE KEYS */;
INSERT INTO `GENERATOR` VALUES (1,'dummy'),(2,'dummy'),(3,'dummy'),(4,'dummy'),(5,'dummy'),(6,'dummy'),(7,'dummy'),(8,'dummy'),(9,'dummy'),(10,'dummy'),(11,'dummy'),(12,'dummy'),(13,'dummy'),(14,'dummy'),(15,'dummy'),(16,'dummy'),(17,'dummy'),(18,'dummy'),(19,'dummy'),(20,'dummy'),(21,'dummy'),(22,'dummy'),(23,'dummy'),(24,'dummy'),(25,'dummy'),(26,'dummy'),(27,'dummy'),(28,'dummy'),(29,'dummy'),(30,'dummy'),(31,'dummy'),(32,'dummy'),(33,'dummy'),(34,'dummy'),(35,'dummy'),(36,'dummy'),(37,'dummy'),(38,'dummy'),(39,'dummy'),(40,'dummy'),(41,'dummy'),(42,'dummy'),(43,'dummy'),(44,'dummy'),(45,'dummy'),(46,'dummy'),(47,'dummy'),(48,'dummy'),(49,'dummy'),(50,'dummy'),(51,'dummy'),(52,'dummy'),(53,'dummy'),(54,'dummy'),(55,'dummy'),(56,'dummy'),(57,'dummy'),(58,'dummy'),(59,'dummy'),(60,'dummy'),(61,'dummy'),(62,'dummy'),(63,'dummy'),(64,'dummy'),(65,'dummy'),(66,'dummy'),(67,'dummy'),(68,'dummy'),(69,'dummy'),(70,'dummy'),(71,'dummy'),(72,'dummy'),(73,'dummy'),(74,'dummy'),(75,'dummy'),(76,'dummy'),(77,'dummy'),(78,'dummy'),(79,'dummy'),(80,'dummy'),(81,'dummy'),(82,'dummy'),(83,'dummy'),(84,'dummy'),(85,'dummy'),(86,'dummy'),(87,'dummy'),(88,'dummy'),(89,'dummy'),(90,'dummy'),(91,'dummy'),(92,'dummy'),(93,'dummy'),(94,'dummy'),(95,'dummy'),(96,'dummy'),(97,'dummy'),(98,'dummy'),(99,'dummy'),(100,'dummy'),(101,'dummy'),(102,'dummy'),(103,'dummy'),(104,'dummy'),(105,'dummy'),(106,'dummy'),(107,'dummy'),(108,'dummy'),(109,'dummy'),(110,'dummy'),(111,'dummy'),(112,'dummy'),(113,'dummy'),(114,'dummy'),(115,'dummy'),(116,'dummy'),(117,'dummy'),(118,'dummy'),(119,'dummy'),(120,'dummy'),(121,'dummy'),(122,'dummy'),(123,'dummy'),(124,'dummy'),(125,'dummy'),(126,'dummy'),(127,'dummy'),(128,'dummy'),(129,'dummy'),(130,'dummy'),(131,'dummy'),(132,'dummy'),(133,'dummy'),(134,'dummy'),(135,'dummy'),(136,'dummy'),(137,'dummy'),(138,'dummy'),(139,'dummy'),(140,'dummy'),(141,'dummy'),(142,'dummy'),(143,'dummy'),(144,'dummy'),(145,'dummy'),(146,'dummy'),(147,'dummy'),(148,'dummy'),(149,'dummy'),(150,'dummy'),(151,'dummy'),(152,'dummy'),(153,'dummy'),(154,'dummy'),(155,'dummy'),(156,'dummy'),(157,'dummy'),(158,'dummy'),(159,'dummy'),(160,'dummy');
/*!40000 ALTER TABLE `GENERATOR` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `HEALTH_HISTORY`
--
DROP TABLE IF EXISTS `HEALTH_HISTORY`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `HEALTH_HISTORY` (
`DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`TOTAL_CALORIES_OUT` double NOT NULL,
`TOTAL_MINUTES_RECORDED` double NOT NULL,
`STEPS` double DEFAULT NULL,
`ELEVATION` double DEFAULT NULL,
`AVG_REST_HR` double DEFAULT NULL,
`REST_MINUTES` double DEFAULT NULL,
`REST_CAL_OUT` double DEFAULT NULL,
`AVG_FAT_BURN_HR` double DEFAULT NULL,
`FAT_BURN_MINUTES` double DEFAULT NULL,
`FAT_BURN_CAL_OUT` double DEFAULT NULL,
`AVG_CARDIO_HR` double DEFAULT NULL,
`CARDIO_MINUTES` double DEFAULT NULL,
`CARDIO_CAL_OUT` double DEFAULT NULL,
`AVG_PEAK_HR` double DEFAULT NULL,
`PEAK_MINUTES` double DEFAULT NULL,
`PEAK_CAL_OUT` double DEFAULT NULL,
PRIMARY KEY (`DIARY_DAY`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `NUTRITION_HISTORY`
--
DROP TABLE IF EXISTS `NUTRITION_HISTORY`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `NUTRITION_HISTORY` (
`DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`MEAL` varchar(50) NOT NULL,
`TOTAL_CALORIES_IN` double NOT NULL,
`CARB` double NOT NULL,
`PROT` double NOT NULL,
`FAT` double NOT NULL,
PRIMARY KEY (`DIARY_DAY`,`MEAL`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `SLEEP_HISTORY`
--
DROP TABLE IF EXISTS `SLEEP_HISTORY`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `SLEEP_HISTORY` (
`DIARY_DAY` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`TOTAL_MINUTES_ASLEEP` double NOT NULL,
`TOTAL_MINUTES_IN_BED` double NOT NULL,
`REM_MINUTES` double NOT NULL,
`LIGHT_MINUTES` double NOT NULL,
`DEEP_MINUTES` double NOT NULL,
`WAKE_MINUTES` double NOT NULL,
PRIMARY KEY (`DIARY_DAY`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2018-11-07 0:14:54
You will note a GENERATOR table has been created. This is necessary to create date series queries, as detailed later.
Step 3: Update your BMQ every day
To get your perceived BMQ, you will have to enter it manually. Before we develop the BMQ app to do that, you can enter it using the following example queries every day:
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values
('3', 'morning', '2018-11-06 7:00:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values
('3', 'pre-workout', '2018-11-06 7:50:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values
('4', 'post-workout', '2018-11-06 8:50:00');
insert into BMQ_HISTORY (BMQ, TYPE, TIME_ENTERED) values
('3', 'evening', now());
Section 2: Creating a Nifi flow to consume MyFitnessPal CSV exports
The goal is to setup the following flow:
Step 1: Survey folder for new CSV Files List Files in MFP Folder: ListFile Processor
This processor will monitor a folder for new files, based on timestamp, then maintain the last file read in its state, as seen below after right-clicking on the processor and selecting view state:
Here is a list of properties to configure that are not default:
Input directory: [YOUR_LOCAL_PATH]
File filter: .*
Run Schedule: 12 h Get Latest File: FetchFile Processor
This processor gets the file from the list generated previously and sends it in a flow file. It is configured using default properties. Step 2: Convert CSV to JSON Update Schema Name: UpdateAttribute Processor
This processor updates the schema name attribute (later used by next processors and controller services). HortonworksSchemaRegistry Controller Service
This controller service is pointing to your HDF Schema Registry. The only non-default property to configure in the controller service before enabling is:
Schema Registry URL: http://[YOUR_SCHEMA_REGISTRY_HOST]:7788/api/v1/ CSVReader Controller Service
This controller service is going to read your CSV and use your Hortonworks Schema Registry to parse its schema. The following non-default properties should be configured in the controller service before enabling is:
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry: HortonworksSchemaRegistry
Treat First Line as Header: true
Ignore CSV Header Column Names: true JsonRecordSetWriter Controller Service
This controller service is going to write your JSON output based on your Hortonworks Schema Registry. The following non-default properties should be configured in the controller service before enabling is:
Schema Write Strategy: Set 'schema.name' Attribute
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry: HortonworksSchemaRegistry Convert CSV to JSON: ConvertRecord Processor
This processor does the record conversion. Configure it with your enabled controllers as such:
Record Reader: CSVReader
Record Writer: JsonRecordSetWriter Step 3: Get fields needed for insert SplitJson Processor
This processor splits each meal in your export in a flow file, by using the following non-default property:
JsonPath Expression: $.[*] Extract Relevant Attributes: EvaluateJsonPath Processor
This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:
Destination: flowfile-attribute
carb: $.carbohydrates_g
diary_day: $.date
fat: $.fat_g
meal: $.meal
prot: $.protein_g
total_calories_in: $.calories Step 4: Insert Data to MySQL Create Upsert Query: ReplaceText Processor
This processor creates the query to be executed on your MySQL server to upsert the different lin. Here is the ReplacementValue you should use:
INSERT INTO NUTRITION_HISTORY
(DIARY_DAY, MEAL, TOTAL_CALORIES_IN, CARB, PROT, FAT)
VALUES
('${diary_day}', '${meal}', ${total_calories_in}, ${carb}, ${prot}, ${fat}) ON DUPLICATE KEY UPDATE
TOTAL_CALORIES_IN = ${total_calories_in},
CARB = ${carb},
PROT = ${prot},
FAT = ${fat}
DBCPConnectionPool Controller Service
This controller service is enabling connection to your MySQL server, configured by the following non-default properties:
Database Connection URL: jdbc:mysql://[YOUR_MYSQL_SERVER_ADDRESS]:3306/BEAST_MODE_DB?useLegacyDatetimeCode=false&serverTimezone=America/New_York
Database Driver Class Name: com.mysql.jdbc.Driver
Database Driver Location(s): file:///home/nifi/mysql_jdbc_drivers/mysql-connector-java-8.0.11.jar (or wherever you put your mysql jdbc driver) Upsert into MySQL: PutSQL Processor
This processor executes the query generated in the ReplaceText processor, and relies on your DBCPConnectionPool controller service, configured as such:
JDBC Connection Pool: DBCPConnectionPool Section 3: Creating a Nifi flow to consume Fitbit health & sleep data
The goal is to setup the following flow (separated in two screenshot, so you know it's going to be fun). It assumes that you registered an application with the Fitbit API and have a refresh/bearer token ready with the right permissions (see details over at
Fitbit documentation).
Step 1: Get Delta parameters Get Latest Date: ExecuteSQL Processor
This processor runs a query to get a series of dates not yet in the BMQ DB in order to run the Fitbit API. It is relying on the configuration of DBCPConnectionPool detailed above (configured in the property Database Connection Pooling Service, and a 12 h Run Schedule). Here is the query it should run:
select list_date from (select
date_format(
adddate((select IFNULL(max(hist.diary_day),'2018-11-07') from HEALTH_HISTORY hist), @num:=@num+1),
'%Y-%m-%d'
) list_date
from
GENERATOR,
(select @num:=-1) num
limit
150) dates
where list_date < CURRENT_DATE()
I realize this is not the only method to generate such a list (using GenerateTableFetch, etc.). This makes sure that I only run what the DB needs if something else changes the DB. ConvertAvroToJSON Processor
This processor converts the Avro response of the previous processor. Make sure you configure the following property to avoid splitting error when only one record is returned.
Wrap Single Record: true SplitJson Processor
This processor splits each record in the query in a flow file, by using the following non-default property:
JsonPath Expression: $.* Extract Relevant Attributes: EvaluateJsonPath Processor
This processor extracts the relevant elements of your split JSON into attributes. It is configured using the following:
Destination: flowfile-attribute
DIARY_DAY: $.list_date Step 2: Handle Token Refresh DistributedMapCacheServer Controller Service
This controller service is the server that will run the distributed cache. I should be configured using all default properties before being enabled. DistributedMapCacheClientService Controller Service
This controller service is connecting to the DistributedMapCacheServer configured previously. Before enabling it, just configure the following:
Server Hostname: [YOUR_HOSTNAME] Get Token From Cache: FetchDistributedMapCache Processor
This processor will try and fetch a refresh token from the DistributedMapCacheServer. If not found, we will send it to an UpdateAttribute to set a default value (e.g. for the first run), otherwise we will send it to the InvokeHTTP to get a new token. Here are the non-default properties to be configured:
Cache Entry Identifier: refresh-token-fitbit
Distributed Cache Service: DistributedMapCacheClientService
Put Cache Value In Attribute: CurrentRefreshToken
A few notes: This configuration requests a refresh token every run, which works with my 12 h run schedule. Ideally, you would store the expiration in distributed cache as well and only request a refresh token then, but I didn't want to overcomplicate things. Moreover, in a production environment, I would probably recommend to persist these token on disk and not only in memory in case of failure. Use Default Value if Not Found: UpdateAttribute Processor
This processor updates the CurrentRefreshToken if FetchDistributedMapCache returns not-found. Configure it using this property:
CurrentRefreshToken: [A_VALID_REFRESH_TOKEN] Refresh Fitbit Token: InvokeHTTP Processor
This calls the Fitbit API to get a new valid token and a refresh token. Configure the following non-default properties to run it:
HTTP Method: POST
Remote URL: https://api.fitbit.com/oauth2/token?grant_type=refresh_token&refresh_token=${CurrentRefreshToken}
Content-Type: application/x-www-form-urlencoded Authorization:
Basic [YOUR_AUTHORIZATION_CODE] (see Fibit Documentation) Extract Token: EvaluateJsonPath Processor
This processor extracts the bearer and refresh tokens from the HTTP response. It is configured using the following:
Destination: flowfile-attribute
carb: $.access_token
diary_day: $.refresh_token Extract Token: EvaluateJsonPath Processor
This processor extracts the bearer and refresh tokens from the HTTP response, then continues to two routes: one storing back the refresh token in cache, the other one using the bearer token to call the Fitbit API. It is configured using the following:
Destination: flowfile-attribute
carb: $.access_token
diary_day: $.refresh_token Put Refresh Token in Flowfile: ReplaceText Processor
This processor puts the refresh token in flowfile in order to be consumed:
${REFRESH_TOKEN} PutDistributedMapCache Processor
This processor takes the refresh token in the flowfile and stores it under refresh-token-fitbit for next time the flow is executed:
Cache Entry Identifier: refresh-token-fitbit
Distributed Cache Service: DistributedMapCacheClientService Step 3: Call Fitbit APIs (Health and Sleep)
For this step, I will only detail one processor. First, because, as you can see on the screenshot above, both Get FitBit Daily Summary are very similar (they are just calling diffrent endpoints of the fitbit API). Secondly, because the flow Extract Relevant Attributes > Create SQL Query > Upsert to MySQL has been described above. Finally, because I will keep the Calculate Averages and Sums for the next flow I run in Strava. All detailed processors can be found here:
feed-fitbit-anonymized.xml Get FitBit Daily Summary: InvokeHTTP Processor
This calls the Fitbit API to get a daily health summary:
HTTP Method: GET
Remote URL: https://api.fitbit.com/1/user/-/activities/date/${DIARY_DAY}.json
Authorization: Bearer ${BEARER_TOKEN} Section 4: Creating a Nifi flow to consume Strava activity data
As for the previous section, the goal is to setup a flow calling Strava's activity API. It assumes that you registered an application with the Strava API and have a refresh/bearer token ready with the right permissions (see details over at
Strava documentation). As opposed with Fitbit the refresh token remains the same, so no need for distributed cache. I'm only going to detail the Convert Units processor to talk about Nifi language. The rest of the flow can be found here: feed-strava-anonymized.xml
Convert Units: UpdateAttribute Processor
This processor uses some of the data extracted to convert units before putting them into attributes later on used in your upsert query. Here are the calculation executed:
AVG_HR: ${AVG_HR:isEmpty():ifElse(0.0, ${AVG_HR})} (makes sure that we have a value if the API does not return HR, in the case of HR not present in activity)
AVG_PACE: ${MILE_IN_METERS:divide(${AVG_PACE:toDecimal():multiply(60.0)})} (converts speed from m/s to min/mile)
DISTANCE: ${DISTANCE:divide(${MILE_IN_METERS})} (converts distance from meters to miles)
DURATION: ${DURATION:toDecimal():divide(60.0)} (converts duration from seconds to mins)
START_TIME: ${START_TIME:replaceAll("T"," "):replaceAll("Z","")} (converts timestamp format to MySQL format)
... View more
11-01-2018
06:46 PM
3 Kudos
Introduction
Knowing how and when to leverage cloud infrastructure and when to use on premises equipment is at the heart of today's mature enterprise data strategies. I decided to dive deeper into the subject, and, like I did with the
Personality Detection data science and engineering platform I built, I wanted to use an example to illustrate how to approach how to architect a hybrid cloud infrastructure.
These types of endeavors can be quite theoretical (read hard to follow and potentially boring), so I decided to attach this hybrid cloud platform demonstration to something very concrete: my own health. I signed up for a marathon that will take place March 29, and I am starting to train November 5th. I plan to use my training, sleep, nutrition and general health data to evaluate and eventually predict my perceived level of fatigue that I coined under the term: "Beast Mode Quotient" or BMQ.
This article is an introduction to the architecture and data flows I will put in place. It will refer to sub articles that will be tutorials that anyone can follow to implement their hybrid cloud strategies. Architecture overview
The figure below gives a highlight of my hybrid cloud platform:
As you can see, it is comprised of the following elements:
A Data Fabric Layer (Data Plane Services) hosting the BMQ dashboard app, as well as Cloubreak in order to deploy ephemeral clusters
One permanent cluster, hosting data ingestion from various health sources (e.g. Strava, MyFitnessPal, Fitbit). This ingestion will also execute the ML pipeline to predict level of fatigue
Any number of ephemeral clusters executing the training and generation of the ML pipeline
Enable Analytics & model training on the data stored in MySQL using Zeppelin notebooks & Spark, that would then feed back the BMQ application
Enable custom application to consumer the data extracted and analyzed
Note: I realize that this architecture is showcasing a near-future state (Cloudbreak is not technically part of Data Plane Service yet). But this series of article is a long term project, so the architecture is set a few weeks into the future. Data Flows
The platform will interact in two major ways, detailed in the flows below. Data Flow 1: Refreshing Data Data Flow 2: Generating ML Pipeline using ephemeral clusters Metrics analyzed and predicted
From a high vantage point, the platform will collect, analyze and predict the following pieces of data. My goal is to log all these pieces of data trough various health and fitness services, create a BMQ predictive model that I will then apply to my upcoming training plan. Activity Data
Distance
Duration
Avg. Heart Rate
Elevation
Avg. Pace Nutrition Data
Calories in
% Carbs
% Proteins
% Fats Sleep Data
Total sleep
% REM
% Light sleep
% Deep sleep
% Awake Health Data
Total Cal. Burned
Avg. Heart Rate
Steps
Elevation
Weight Perceived Fatigue
BMQ: Morning
BMQ: Pre-Workout
BMQ: Post-Workout
BMQ: Evening Implementation Tutorials
The implementation of this platform will be detailed in the upcoming following tutorial articles:
Part 1: Create Nifi Flows to ingest API and flat files and load them onto MySQL
Part 2: Create Cloudbreak blueprints to deploy data science ready ephemeral clusters
Part 3: Use Spark and Zeppelin to train fatigue prediction models
Part 4: Create an end-to-end application automating BMQ calculation and prediction
... View more
10-15-2018
03:17 PM
Overview
Last article of the series, and a fun one. As usual, this article is the continuation of part 1, part 2 and part 3, so make sure you read those before starting. The completion of this tutorial will finalize your end to end architecture, as depicted here:
[ADD IMAGE]
This tutorial will be divided in 2 sections:
Section 1: Create a Web Service with Nifi
Section 2: Create a consumer application with code Igniter
Note: Section 2 will not be detailed. First, because web application development is not the main goal of Hortonworks Community Connection. Second, because I am pretty sure I did a terrible job at it considering it is not my domain of expertise, and I shouldn't spread mediocrity to the world! Section 1: Create a Web Service with Nifi
For your entire project to work, you will have to create more than one web service although this section only goes through the creation of one (see the flow overview below). To get all the flows required for your app to work, check github here.
[ADD FLOW IMAGE] Step 1: Create a request handler StandardHttpContextMap
This can't be more straight forward. Create a StandardHttpContextMap controller service with default options and enable it.
[ADD CONTROLLER IMAGE] HandleHttpRequest: Receive request and data
Create a processor simple HandleHttpRequest processor.
[ADD PROCESSOR IMAGE]
Here the properties you need to change (all other are standard):
HandleHttpRequest
8012
HTTP Context Map
StandardHttpContextMap (the controller you just created)
Allowed Paths
/dashboard
Step 2: Retrieve data from Hbase Execute SQL
Create 4 execute SQLs that will run queries retrieving all the data you need. They all rely on a DBCPConnectionPool controller for Phoenix. To understand how to create that, please refer to part 3.
Each and every Execute SQL will have a select count in them. I'm not going to list them here, but I included a template for all web services to this article
... View more
10-15-2018
03:14 PM
2 Kudos
Overview
Last article of the series, and a fun one. As usual, this article is the continuation of
part 1, part 2 and part 3, so make sure you read those before starting. The completion of this tutorial will finalize your end to end architecture, as depicted here:
This tutorial will be divided in 2 sections:
Section 1: Create a Web Service with Nifi
Section 2: Create a consumer application with code Igniter
Note: Section 2 will not be detailed. First, because web application development is not the main goal of Hortonworks Community Connection. Second, because I am pretty sure I did a terrible job at it considering it is not my domain of expertise, and I shouldn't spread mediocrity to the world!
Section 1: Create a Web Service with Nifi
For your entire project to work, you will have to create more than one web service although this section only goes through the creation of one (see the flow overview below). To get all the flows required for your app to work, check github
here.
Step 1: Create a request handler
StandardHttpContextMap
This can't be more straight forward. Create a StandardHttpContextMap controller service with default options and enable it.
HandleHttpRequest: Receive request and data
Create a processor simple HandleHttpRequest processor.
Here the properties you need to change (all other are standard):
HandleHttpRequest
8012
HTTP Context Map
StandardHttpContextMap (the controller you just created)
Allowed Paths
/dashboard
Step 2: Retrieve data from Hbase
Execute SQL
Create 4 execute SQLs that will run queries retrieving all the data you need. They all rely on a DBCPConnectionPool controller for Phoenix. To understand how to create that, please refer to
part 3.
Each and every Execute SQL will have a select count in them. I'm not going to list them here, but I included a
article for all web services to this article.
Convert Avro to JSON
Yet again a very straight forward configuration. For each ExecuteSQL you created, create a ConvertAvroToJSON processor with standard configuration:
Step 3: Merge and prepare Response
Merge Content
Merge all outputs from the Convert Avro to JSON processors using a standard MergeContent processor:
Replace Text
Use a ReplaceText to add brackets before and after the merge content, so that your web app can read it:
Step 4: Send response back
Handle Http Response
Finally, use a standard configuration for a HandleHttpResponse processor to return the web response:
Start your flow, then ou should be able to call your service by going to this URL:
http://[your_nifi_address]:8012/dashboard
The response should be something along these lines:
[{"BYLINES": 781},{"TOTAL": 6153},{"TOTAL_VIEWS": 4147},{"NEWS_DESKS": 63}]
Section 2: Create a consumer application with code Igniter
Once you've created all the web services, you will need to create a web app to consume these services. I created one using code igniter, and you can see all the code on
my github. As I mentioned before, I'm not going to go deep in this configuration, but to make this app run, you will need to follow these steps:
Step 1: Instantiate a Linux web server with Apache, Maria DB, and PHP 7
Step 2: Create a adminlte database on your server with the appropriate data by running the SQL script below
Step 3: Clone my github to the root of your apache server
Step 4: Change the config.php base_url to match your server, and the database.php to match your credentials
-- MySQL dump 10.14 Distrib 5.5.60-MariaDB, for Linux (x86_64)
--
-- Host: localhost Database: adminlte
-- ------------------------------------------------------
-- Server version 5.5.60-MariaDB
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Database creation
--
CREATE DATABASE adminlte;
use adminlte;
--
-- Table structure for table `activity_logs`
--
DROP TABLE IF EXISTS `activity_logs`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `activity_logs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` text NOT NULL,
`user` text NOT NULL,
`ip_address` text NOT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `activity_logs`
--
LOCK TABLES `activity_logs` WRITE;
/*!40000 ALTER TABLE `activity_logs` DISABLE KEYS */;
INSERT INTO `activity_logs` VALUES (1,'Administrator Logged in','1','10.42.80.145','2018-10-11 18:34:37','0000-00-00 00:00:00'),(2,'User: Administrator Logged Out','1','10.42.80.145','2018-10-11 19:44:48','0000-00-00 00:00:00'),(3,'Administrator Logged in','1','10.42.80.145','2018-10-11 19:44:54','0000-00-00 00:00:00'),(4,'User: Administrator Logged Out','1','10.42.80.145','2018-10-11 20:52:02','0000-00-00 00:00:00'),(5,'Administrator Logged in','1','10.42.80.145','2018-10-11 20:52:06','0000-00-00 00:00:00'),(6,'User: Administrator Logged Out','1','10.42.80.145','2018-10-11 20:56:16','0000-00-00 00:00:00'),(7,'Administrator Logged in','1','10.42.80.145','2018-10-11 20:56:24','0000-00-00 00:00:00'),(8,'User: Administrator Logged Out','1','10.42.80.145','2018-10-11 20:59:01','0000-00-00 00:00:00'),(9,'Administrator Logged in','1','10.42.80.145','2018-10-11 21:02:36','0000-00-00 00:00:00'),(10,'User: Administrator Logged Out','1','10.42.80.145','2018-10-11 21:06:35','0000-00-00 00:00:00'),(11,'Administrator Logged in','1','10.42.80.145','2018-10-11 21:12:24','0000-00-00 00:00:00'),(12,'Administrator Logged in','1','10.42.80.172','2018-10-12 15:52:55','0000-00-00 00:00:00'),(13,'Administrator Logged in','1','10.42.80.85','2018-10-13 11:45:37','0000-00-00 00:00:00'),(14,'Administrator Logged in','1','10.42.80.153','2018-10-13 12:27:08','0000-00-00 00:00:00'),(15,'User: Administrator Logged Out','1','10.42.80.153','2018-10-13 12:27:14','0000-00-00 00:00:00'),(16,'Administrator Logged in','1','10.42.80.153','2018-10-13 12:27:40','0000-00-00 00:00:00'),(17,'Administrator Logged in','1','10.42.80.204','2018-10-15 16:21:06','0000-00-00 00:00:00');
/*!40000 ALTER TABLE `activity_logs` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `permissions`
--
DROP TABLE IF EXISTS `permissions`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `permissions` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` text NOT NULL,
`code` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `permissions`
--
LOCK TABLES `permissions` WRITE;
/*!40000 ALTER TABLE `permissions` DISABLE KEYS */;
INSERT INTO `permissions` VALUES (1,'Users List','users_list'),(2,'Add Users','users_add'),(3,'Edit Users','users_edit'),(4,'Delete Users','users_delete'),(5,'Users View','users_view'),(6,'Activity Logs List','activity_log_list'),(7,'Acivity Log View','activity_log_view'),(8,'Roles List','roles_list'),(9,'Add Roles','roles_add'),(10,'Edit Roles','roles_edit'),(11,'Permissions List','permissions_list'),(12,'Add Permissions','permissions_add'),(13,'Permissions Edit','permissions_edit'),(14,'Delete Permissions','permissions_delete'),(15,'Company Settings','company_settings');
/*!40000 ALTER TABLE `permissions` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `role_permissions`
--
DROP TABLE IF EXISTS `role_permissions`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `role_permissions` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`role` int(11) NOT NULL,
`permission` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `role_permissions`
--
LOCK TABLES `role_permissions` WRITE;
/*!40000 ALTER TABLE `role_permissions` DISABLE KEYS */;
INSERT INTO `role_permissions` VALUES (1,1,'users_list'),(2,1,'users_add'),(3,1,'users_edit'),(4,1,'users_delete'),(5,1,'users_view'),(6,1,'activity_log_list'),(7,1,'activity_log_view'),(8,1,'roles_list'),(9,1,'roles_add'),(10,1,'roles_edit'),(11,1,'permissions_list'),(12,1,'permissions_add'),(13,1,'permissions_edit'),(14,1,'permissions_delete'),(15,1,'company_settings');
/*!40000 ALTER TABLE `role_permissions` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `roles`
--
DROP TABLE IF EXISTS `roles`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `roles` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `roles`
--
LOCK TABLES `roles` WRITE;
/*!40000 ALTER TABLE `roles` DISABLE KEYS */;
INSERT INTO `roles` VALUES (1,'Admin');
/*!40000 ALTER TABLE `roles` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `settings`
--
DROP TABLE IF EXISTS `settings`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `settings` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`key` text NOT NULL,
`value` text NOT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `settings`
--
LOCK TABLES `settings` WRITE;
/*!40000 ALTER TABLE `settings` DISABLE KEYS */;
INSERT INTO `settings` VALUES (1,'company_name','Company Name','2018-06-21 17:37:59'),(2,'company_email','testcompany@gmail.com','2018-07-11 11:09:58'),(3,'timezone','Asia/Kolkata','2018-07-15 19:54:17');
/*!40000 ALTER TABLE `settings` ENABLE KEYS */;
UNLOCK TABLES;
--
-- Table structure for table `users`
--
DROP TABLE IF EXISTS `users`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` text NOT NULL,
`username` text NOT NULL,
`email` text NOT NULL,
`password` text NOT NULL,
`phone` text NOT NULL,
`address` longtext NOT NULL,
`last_login` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
`role` int(11) NOT NULL,
`reset_token` text NOT NULL,
`created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
`updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `users`
--
LOCK TABLES `users` WRITE;
/*!40000 ALTER TABLE `users` DISABLE KEYS */;
INSERT INTO `users` VALUES (1,'Administrator','admin','admin@gmail.com','21232f297a57a5a743894a0e4a801fc3','','','2018-10-15 21:10:51',1,'','2018-06-27 18:30:16','0000-00-00 00:00:00');
/*!40000 ALTER TABLE `users` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
After you're done with that and have restarted your web server, go to:
http://[YOUR_SERVER]/index.php
You can now connect using admin/admin. You should see screens like these ones:
... View more
10-04-2018
11:38 AM
4 Kudos
Overview
Time to continue my series on news author personality detection! This tutorial will look at data science, Zeppelin notebooks with the idea of predicting the popularity of an article based on the personality of an author. This tutorial does assume that you completed
part 1 and part 2 of the series. After completing this part, your architecture will look like this:
This tutorial will be divided in sections:
Section 1: Update Nifi to send article data to Phoenix
Section 2: Read Phoenix data with Spark in Zeppelin Note
Section 3: Train Linear Regression Model
Section 4: Make and store article popularity predictions
Note: This exercise is a very simplified way to apply machine learning. I spent very little time selecting the appropriate algorithm, accumulating enough data or even challenging whether or not the personality of an author is a viable predictor for personality. This is a full time job, it's called being a data scientist! But hopefully this will enable the real data scientist to please stand up and build useful stuff using the Hortonworks stack.
Section 1: Update Nifi to send article data to Phoenix
This section's goal is to first enhance our previous Nifi flow to upsert data into HBase using Phoenix, as well as add a new flow collecting data from the New York Times "most popular" API. You can find all these flows under my github
here. I used Nifi registry to push directly to github; if you want to learn how to do that, check out this article: How to configure git for Nifi Registry in HDF 3.2.
Step 1: Create the appropriate tables using Phoenix
Connect to your server and run phoenix-sqlline (with the appropriate user):
$ ssh [YOUR_MACHINE]
$ sudo su hbase
$ phoenix-sqlline
Create three tables:
One to store author personality evaluation per article
One to store article popularity
One to store popularity prediction based on author personality
Here is the SQL create code:
CREATE TABLE article_evaluation (
web_url VARCHAR NOT NULL PRIMARY KEY,
snippet VARCHAR,
byline VARCHAR,
pub_date TIMESTAMP,
headline VARCHAR,
document_type VARCHAR,
news_desk VARCHAR,
filename VARCHAR,
last_updated VARCHAR,
extraversion DOUBLE,
emotional_stability DOUBLE,
agreeableness DOUBLE,
conscientiousness DOUBLE,
openness_to_experience DOUBLE
);
CREATE TABLE article_stats (
web_url VARCHAR NOT NULL PRIMARY KEY,
views TINYINT,
last_updated VARCHAR
);
CREATE TABLE article_popularity_prediction (
web_url VARCHAR NOT NULL PRIMARY KEY,
extraversion DOUBLE,
emotional_stability DOUBLE,
agreeableness DOUBLE,
conscientiousness DOUBLE,
openness_to_experience DOUBLE,
prediction DOUBLE
);
Step 2: Update your existing flow to upsert to Phoenix
Our goal is to add the following processors to our previous flow (established in
part 1 of this series):
Update Attribute: Escape characters
Create a processor and configure it to create/update 3 attributes:
filename: filename used to store the article text into a hdfs file (optional and not part of this tutorial, but the field is needed for the upsert)
headline: article headline
snippet: article snippet
Here is the code for each attribute:
filename
${web_url:substring(${web_url:lastIndexOf('/'):plus(1)},${web_url:length()})}
headline
${headline:replaceAll('\'',''):replaceAll('\"',''):replaceAll(';','')}
snippet
${snippet:replaceAll('\'',''):replaceAll('\"',''):replaceAll(';','')}
Replace Text: Create Upsert query
This one is straight forward, as depicted below:
Your replacement value should be:
upsert into article_evaluation values('${web_url}','${snippet}','${byline}','${pub_date}','${headline}','${document_type}','${news_desk}','${filename}','${now()}',${mairessepersonalityrecognition.extraversion},${mairessepersonalityrecognition.emotional_stability},${mairessepersonalityrecognition.agreeableness},${mairessepersonalityrecognition.conscientiousness},${mairessepersonalityrecognition.openness_to_experience})
PutSQL: Run Query
This processor is straight forward too, but will require to first create a DBCPConnectionPool controller service:
Assuming the nifi node has a phoenix client, here is the parameters you will need to setup:
Database Connection URL
jdbc:phoenix:localhost:2181:/hbase-unsecure
Database Driver Class Name
org.apache.phoenix.jdbc.PhoenixDriver
Database Driver Location(s)
file:///usr/hdp/current/phoenix-client/phoenix-client.jar
Finally, configure a PutSQL as depicted below. You will note that I turned off the support for fragmented transactions. This is not important here but will in the next flow where we split the output of the API.
Step 3: Create a new flow for article popularity
The flow itself is pretty self explanatory, as shown below:
Invoke HTTP: Call Most Popular NYT API
Remote URL should be (to get an API key, see
part 1):
http://api.nytimes.com/svc/mostpopular/v2/mostviewed/all-sections/1.json?api-key=[YOUR_API_KEY]
SplitJson: Split Response into 10 flow files
JsonPath Expression should be:
$.results[*]
EvaluateJsonPath: Extract Relevant Attributes
Extract url from JSON:
$.url
Extract views from JSON:
$.views
ReplaceText: Create Phoenix Query
Replacement Value:
upsert into article_stats values('${url}',${views},'${now()}')
PutSQL: Run Phoenix Upsert SQL
This is the exact same processor as in your other flow :)
Section 2: Read Phoenix data with Spark in Zeppelin Note
Step 1: Download the latest phoenix Spark 2 Jar
Go to the maven repository and get the latest jar for phoenix and spark2 (see repository
here).
Once downloaded add it to the spark2 libraries, under the following folder: /usr/hdp/current/spark2-client/jars.
In this directory copy the phoenix client if it's not present yet from /usr/hdp/current/phoenix-client/.
$ cd /usr/hdp/current/spark2-client/jars
$ wget http://repo.hortonworks.com/content/repositories/releases/org/apache/phoenix/phoenix-spark2/4.7.0.2.6.5.3000-28/phoenix-spark2-4.7.0.2.6.5.3000-28.jar
$ cp /usr/hdp/current/phoenix-client/phoenix-5.0.0.3.0.1.0-187-client.jar .
$ ls -ltra phoenix*
-rw-r--r--. 1 root root 222910655 Oct 2 17:35 phoenix-5.0.0.3.0.1.0-187-client.jar
-rwxr-xr-x. 1 root root 83908 Oct 2 17:35 phoenix-spark2-4.7.0.2.6.5.3000-28.jar
Restart spark from Ambari.
Step 2: Create a new note to retrieve data from Phoenix
Open Zeppelin and create a new note (I named mine Linear Regression).
Using pyspark, retrieve data from the article_evaluation and article_stats tables into a dataframe, and join them based on the web_url:
%pyspark
from pyspark.sql.functions import lit
article_evaluation_table = sqlContext.read .format("org.apache.phoenix.spark") .option("table", "article_evaluation") .option("zkUrl", "localhost:2181") .load()
article_stats_table = sqlContext.read .format("org.apache.phoenix.spark") .option("table", "article_stats") .option("zkUrl", "localhost:2181") .load()
aet = article_evaluation_table.alias('aet')
ast = article_stats_table.alias('ast')
inner_join = aet.join(ast, aet.WEB_URL == ast.WEB_URL)
article_df = inner_join.select('EXTRAVERSION','OPENNESS_TO_EXPERIENCE','CONSCIENTIOUSNESS','AGREEABLENESS','EMOTIONAL_STABILITY','VIEWS')
no_views_df = article_evaluation_table.select('WEB_URL','EXTRAVERSION','OPENNESS_TO_EXPERIENCE','CONSCIENTIOUSNESS','AGREEABLENESS','EMOTIONAL_STABILITY').withColumn('VIEWS',lit(0))
Section 3: Train Linear Regression Model
In the same note, create a paragraph that creates a vectorized dataframe for your model to use:
%pyspark
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['EXTRAVERSION', 'OPENNESS_TO_EXPERIENCE', 'CONSCIENTIOUSNESS', 'AGREEABLENESS', 'EMOTIONAL_STABILITY'], outputCol = 'features')
varticle_df = vectorAssembler.transform(article_df)
varticle_df = vtrainingDF.select(['features', 'VIEWS'])
vno_views_df = vectorAssembler.transform(no_views_df)
train_df = varticle_df
Use the dataframe created to train a linear regression model:
%pyspark
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='VIEWS', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
Check the summary of your model:
%pyspark
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
train_df.describe().show()
Section 4: Make and store article popularity predictions
Run Your model on all articles in the article evaluation table:
%pyspark
predictions = lr_model.transform(vno_views_df)
df_to_write = predictions.select("WEB_URL","EXTRAVERSION","OPENNESS_TO_EXPERIENCE","CONSCIENTIOUSNESS","AGREEABLENESS","EMOTIONAL_STABILITY","prediction")
Upload these results to the article_popularity_prediction table using phoenix
%pyspark
df_to_write.write .format("org.apache.phoenix.spark") .mode("overwrite") .option("table", "article_popularity_prediction") .option("zkUrl", "localhost:2181") .save()
Finally, if you want to check your predictions versus the actual view, create a jdbc(phoenix) paragraph, as follows:
%jdbc(phoenix)
select article_popularity_prediction.web_url as URL,
article_popularity_prediction. prediction as predicted_views,
article_stats.views
from article_popularity_prediction, article_stats
where article_popularity_prediction.web_url = article_stats.web_url
As you can see the model is far from perfect. I would also argue that creating a PMML from the model would have been a better way to go for our architecture (calling the PMML from Nifi as data flows), instead of loading results offline like we do here. Again, this is a full time job 🙂 See you next article, where we will create a dashboard to leverage this data science and data engineering we just created!
Note: for reference, I included my Zeppelin note here: linear-regression-zeppelin-note.json
... View more
09-24-2018
06:54 PM
4 Kudos
Overview
As you may or may not know, Nifi-Registry 0.2.0 (included in HDF 3.2) allows the storage of versioned flows in a git repository.
@Andrew Lim wrote a great article on how to configure this storage for nifi-registry as you can see here: https://community.hortonworks.com/articles/210286/storing-apache-nifi-versioned-flows-in-a-git-repos.html
I thought I'd give a few pointers on how to do it in HDF 3.2, following these four simple steps:
Step 1: Setup your git repository
Step 2: Clone repository on HDF server
Step 3: Configure nifi-registry via Ambari
Step 4: Restart nifi-registry
Step 1: Setup your git repository
This step is essentially exactly what was highlighted by Andrew.
Create a new repository
Create a token
Go to to GitHub’s “Developer settings” and create a new “Personal access token”, as follows:
Step 2: Clone repository on HDF server
Connect to the server running nifi-registry on your hdf cluster.
Make sure that git is installed; if it isn't run the following and follow instructions:
$ sudo yum install git
Go to the nifi_registry_internal_dir configured for your registry (/var/lib/nifi-registry by default) and clone your repo as the nifiregistry user:
$ sudo su nifiregistry
$ cd /var/lib/nifi-registry
$ mkdir git
$ cd git
$ git clone [address_of_your_repo]
Step 3: Configure nifi-registry via Ambari
The configuration for git is already in your providers.xml commented out. You will have to swap it for the default one that uses a local folder.
In your nifi-registry configs tab, go to Advanced nifi-registry-providers-env and change it as follows:
The key changes should look like this:
<!--
<flowPersistenceProvider>
<class>org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider</class>
<property name="Flow Storage Directory">{{nifi_registry_internal_dir}}/flow_storage</property>
</flowPersistenceProvider>
-->
<flowPersistenceProvider>
<class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
<property name="Flow Storage Directory">{{nifi_registry_internal_dir}}/git/perso-recognition-nifi-flows</property>
<property name="Remote To Push">origin</property>
<property name="Remote Access User">paulvid</property>
<property name="Remote Access Password">[your_key]</property>
</flowPersistenceProvider>
Step 4: Restart nifi-registry
Using Ambari, restart nifi-registry:
Existing buckets will be automatically uploaded when you commit changes, and your repository will look like this:
... View more
Labels:
09-05-2018
12:53 PM
3 Kudos
Overview
Following my series on news author personality detection, let's dive into the streaming analytics side of things, and continue building our data science and engineering platform. This tutorial does assume that you completed
part 1 of the series. After completing this part, your architecture will look like this:
This tutorial will be divided in sections:
Section 1: Create a custom UDF for SAM
Section 2: Create a SAM application to sink to Druid
Section 3: Monitor your streaming application
Section 4: Create a simple superset visualization
Note: Creating a custom UDF is a purely academical exercise, as is the use of SAM for sinking into Druid, considering I could have connected Kafka to Druid directly. However, one of the goal of this series is to learn as much as possible about the Hortonworks stack. Therefore, custom UDF and SAM is on the menu.
Section 1: Create a custom UDF for SAM
Step 1: Download the structure for your custom UDF
Using the example give by the
SAM documentation, fork/clone this repository: link.
Using your favorite IDE, open the cloned folder and navigate to the sam-custom-udf folder:
Step 2: Create a new class
Under [YOUR_PATH]/sam-custom-extensions/sam-custom-udf/src/main/java/hortonworks/hdf/sam/custom/udf/math add a new class called NormalizeByline (I realize this is not a math function, but again, this is an exercise). This class removes the "By " at the beginning of a by line and capitalizes it. It should contain the following code:
package hortonworks.hdf.sam.custom.udf.math;
import com.hortonworks.streamline.streams.rule.UDF;
import org.apache.commons.lang.WordUtils;
public class NormalizeByline implements UDF<String, String> {
public String evaluate(String value) {
String result = WordUtils.capitalize(value.replaceAll("By ", ""));
return result;
}
}
Step 3: Compile your code
Go to [YOUR_PATH]/sam-custom-extensions/sam-custom-udf/ and run the following command:
$ mvn install
You should see something like this:
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for hortonworks.hdf.sam.custom:sam-custom-udf:jar:3.1.0.0-564
[WARNING] 'version' contains an expression but should be a constant. @ hortonworks.hdf.sam.custom:sam-custom-extensions:${sam.extensions.version}, /Users/pvidal/Documents/sam-custom/sam-custom-extensions/pom.xml, line 6, column 13
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ hortonworks.hdf.sam.custom:sam-custom-extensions:${sam.extensions.version}, /Users/pvidal/Documents/sam-custom/sam-custom-extensions/pom.xml, line 80, column 12
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ hortonworks.hdf.sam.custom:sam-custom-extensions:${sam.extensions.version}, /Users/pvidal/Documents/sam-custom/sam-custom-extensions/pom.xml, line 94, column 21
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-shade-plugin is missing. @ hortonworks.hdf.sam.custom:sam-custom-extensions:${sam.extensions.version}, /Users/pvidal/Documents/sam-custom/sam-custom-extensions/pom.xml, line 89, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------< hortonworks.hdf.sam.custom:sam-custom-udf >--------------
[INFO] Building sam-custom-extensions-udfs 3.1.0.0-564
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ sam-custom-udf ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ sam-custom-udf ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /Users/pvidal/Documents/sam-custom/sam-custom-extensions/sam-custom-udf/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ sam-custom-udf ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/sam-custom/sam-custom-extensions/sam-custom-udf/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ sam-custom-udf ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ sam-custom-udf ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ sam-custom-udf ---
[INFO] Building jar: /Users/pvidal/Documents/sam-custom/sam-custom-extensions/sam-custom-udf/target/sam-custom-udf-3.1.0.0-564.jar
[INFO]
[INFO] --- maven-install-plugin:2.4:install (default-install) @ sam-custom-udf ---
[INFO] Installing /Users/pvidal/Documents/sam-custom/sam-custom-extensions/sam-custom-udf/target/sam-custom-udf-3.1.0.0-564.jar to /Users/pvidal/.m2/repository/hortonworks/hdf/sam/custom/sam-custom-udf/3.1.0.0-564/sam-custom-udf-3.1.0.0-564.jar
[INFO] Installing /Users/pvidal/Documents/sam-custom/sam-custom-extensions/sam-custom-udf/pom.xml to /Users/pvidal/.m2/repository/hortonworks/hdf/sam/custom/sam-custom-udf/3.1.0.0-564/sam-custom-udf-3.1.0.0-564.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.981 s
[INFO] Finished at: 2018-09-04T16:46:04-04:00
[INFO] ------------------------------------------------------------------------
Once compiled, a new jar (sam-custom-udf-3.1.0.0-564.jar) should be created under [YOUR_PATH]/sam-custom-extensions/sam-custom-udf/target
Step 4: Add UDF to SAM
In your SAM application, go to Configuration > Application Resources UDF.
Click on the + sign on the right side of the screen, and enter the following parameters:
Don't forget to upload your recently created jar (sam-custom-udf-3.1.0.0-564.jar). Once done, click OK.
Section 2: Create a SAM application to sink to Druid
Step 1: Configure Service Pool & Environment
In your SAM application, go to Configuration > Service Pool.
Use the Auto Add feature by specifying your Ambari's API: http://ambari_host:port/api/v1/clusters/CLUSTER_NAME
Once Auto Add is complete, you should see something like this:
Then, go to Configuration > Environments and create a new environment selecting Kafka, Druid and Storm, as follows:
Step 2: Create application
Go to My Applications. Use the + sign in the top right of the screen to create a new application using the environment we just configured, as follows:
Step 3: Add Kafka Source
In the edit screen of your new application, add a Kafka source:
Most of the parameters are auto detected from your environment (including kafka topic and schema registry created in previous tutorial), as detailed below for mine:
Cluster Name: pvidal_personality_recognition
Security Protocol: PLAINTEXT
Bootstrap Servers: pvidal-personality-recognition0.field.hortonworks.com:6667
Kafka Topic: personality-recognition
Reader Schema Branch: MASTER
Reader Schema Version: 1
Consumer Group Id: persorecognition_group (you can put whatever you want here)
For this exercise, no need to concern yourself with optional parameters.
Step 4: Add Projection
In the edit screen of your new application, add a projection to use your custom UDF, and link it to your kafka source. The configuration is very straightforward, as depicted below:
Step 5: Add Druid sink
In the edit screen of your new application, add a Druid sink, and link it to your projection. You will need to configure two screens for this sink: the required part for Druid dimensions, and the optional part for aggregations, to be used later on in superset.
The required part should have the following configuration elements:
Cluster Name: pvidal_personality_recognition
Name of the indexing service: druid/overlord
Service discovery path: /druid/discovery
Datasource Name: personality-detection-datasource (you can put whatever you want here)
Zookeeper connection string: pvidal-personality-recognition0.field.hortonworks.com:2181,pvidal-personality-recognition1.field.hortonworks.com:2181,pvidal-personality-recognition2.field.hortonworks.com:2181
Dimensions: Everything but the 5 traits of personality for which we will create aggregations
Timestamp field name: processingTime
Window Period: PT10M
Index Retry Period: PT10M
Segment Granularity: Minute
Query Granularity: Second
In the optional section of the configuration, you will leave every parameters as default, but you will have to create, for each personality trait (agreeableness, extraversion, openness_to_experience, conscientiousness, emotional_stability), 3 aggregators:
Double Sum
Double Max
Double Min
The figure below details the configuration of the agreeableness Double Sum aggregator:
Section 3: Monitor your streaming application
After finalizing your application, launch it by clicking on the paper plane icon of SAM. This will deploy the application. Once launched the application can be monitored via the Storm UI. Go to the My Applications screen, click on your application, then click on the storm UI icon on the top right of the screen:
The icon opens a new tab to the Storm UI, where you can monitor your app for error, lag, etc.
I also recommend to get familiar with navigating indexing and segmentation from the Druid Overlord or Druid console:
Section 4: Create a simple superset visualization
Step 1: Refresh / Verify Datasource
In Superset, click on Refresh Druid Metadata:
You should see your datasource listed in the Druid datasources:
Click on the edit icon of your datasource and verify your druid columns. It should look something like this:
Verify your metrics as well. It should look something like this:
Step 2: Create visualization
Go back to the druid datasources screen and click on your datasource. You can then create visualizations based on the data collected. Below is a simple example of personality trait, per byline, using the min of each aggregate:
Superset of course gives you the opportunity to create a lot better visualizations, including timeseries, if you are not creatively impaired like I am. See you soon for part 3, where we will dive deeper into machine learning and optimize our platform.
... View more
08-23-2018
02:22 PM
2 Kudos
Overview
As mentioned in the article parent to this article, the objective of this part 1 is to ingest data from the New York Times API, run HTML scraping, personality detection and expose it to a Kafka topic, as depicted by the architecture below:
The end goal is thus to implement a NiFi flow like the following:
To implement it, this article will be divided in 4 section:
Section 1: Create a NYT API consumer
Section 2: Create a custom BoilerPipe Extractor Processor
Section 3: Create a custom Mairesse Personality Processor
Section 4: Create and configure a Kafka topic
Section 1: Create a NYT API consumer
Disclaimer: I basically copy-pasted this from my previous article,
Determining the big 5 traits of Personality Psychology of news articles using NiFi, Hive & Zeppelin.
Step 1: Obtaining an API Key
This step is very straight forward. Go to https://developer.nytimes.com/signup and sign-up for a key.
Step 2: Configuring InvokeHTTP
The InvokeHTTP is used here with all default parameters, except for the URL. Here are some key configuration items and a screenshot of the Processor configuration:
HTTP Method: GET
Remote URL: http://api.nytimes.com/svc/search/v2/articlesearch.json?fq=source:("The New York Times")&page=0&sort=newest&fl=web_url,snippet,headline,pub_date,document_type,news_desk,byline≈i-key=[YOUR_KEY] (This URL selects article from the New York Times as a source, and only selects some of the fields I am interested in: web_url,snippet,headline,pub_date,document_type,news_desk,byline).
Content-Type: ${mime.type}
Run Schedule: 5 mins (could be set to a little more, I'm not sure the frequency at which new articles are published)
Step 3: Extracting results from Invoke HTTP response
The API call parameter page=0 returns results 0-9; for this exercise, I'm only interested in the latest article, so I setup an evaluateJSONpath processor take care of that, as you can see below:
Section 2: Create a custom BoilerPipe Extractor Processor
The goal of this section is to create a simple processor that extracts article text based on a URL, using the BoilerPipe library.
Step 1: Install Maven
Having Maven installed is a pre-requisite (so is having Java installed, a computer and the knowledge & will to run this tutorial). There are many ways to install maven, depending on your machine, but for MacOS I recommend using the homebrew package manager and simply run
$ brew install maven
Step 2: Generate a NiFi processor archetype
Once installed, create a new folder, wherever you'd like and create a nifi processor archetype as such:
$ mkdir tutorial
$ cd tutorial/
$ mvn archetype:generate
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.0.1:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.0.1:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.0.1:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.0)
Choose archetype:
1: remote -> am.ik.archetype:maven-reactjs-blank-archetype (Blank Project for React.js)
2: remote -> am.ik.archetype:msgpack-rpc-jersey-blank-archetype (Blank Project for Spring Boot + Jersey)
3: remote -> am.ik.archetype:mvc-1.0-blank-archetype (MVC 1.0 Blank Project)
4: remote -> am.ik.archetype:spring-boot-blank-archetype (Blank Project for Spring Boot)
[...]
2214: remote -> xyz.luan.generator:xyz-generator (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1224: nifi
Choose archetype:
1: remote -> org.apache.nifi:nifi-processor-bundle-archetype (-)
2: remote -> org.apache.nifi:nifi-service-bundle-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1
Choose org.apache.nifi:nifi-processor-bundle-archetype version:
1: 0.0.2-incubating
2: 0.1.0-incubating
3: 0.2.0-incubating
4: 0.2.1
5: 0.3.0
6: 0.4.0
7: 0.4.1
8: 0.5.0
9: 0.5.1
10: 0.6.0
11: 0.6.1
12: 0.7.0
13: 0.7.1
14: 0.7.2
15: 0.7.3
16: 0.7.4
17: 1.0.0-BETA
18: 1.0.0
19: 1.0.1
20: 1.1.0
21: 1.1.1
22: 1.1.2
23: 1.2.0
24: 1.3.0
25: 1.4.0
26: 1.5.0
27: 1.6.0
28: 1.7.0
29: 1.7.1
Choose a number: 29: 29
Define value for property 'groupId': com.github.paulvid
Define value for property 'artifactId': boilerpipe-article-extractor
Define value for property 'version' 1.0-SNAPSHOT: : 1.7.0.3.2.0.0-520
Define value for property 'artifactBaseName': BoilerpipeArticleExtractor
Define value for property 'package' com.github.paulvid.processors.BoilerpipeArticleExtractor: :
[INFO] Using property: nifiVersion = 1.7.1
Confirm properties configuration:
groupId: com.github.paulvid
artifactId: boilerpipe-article-extractor
version: 1.7.0.3.2.0.0-520
artifactBaseName: BoilerpipeArticleExtractor
package: com.github.paulvid.processors.BoilerpipeArticleExtractor
nifiVersion: 1.7.1
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: nifi-processor-bundle-archetype:1.7.1
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.github.paulvid
[INFO] Parameter: artifactId, Value: boilerpipe-article-extractor
[INFO] Parameter: version, Value: 1.7.0.3.2.0.0-520
[INFO] Parameter: package, Value: com.github.paulvid.processors.BoilerpipeArticleExtractor
[INFO] Parameter: packageInPathFormat, Value: com/github/paulvid/processors/BoilerpipeArticleExtractor
[INFO] Parameter: package, Value: com.github.paulvid.processors.BoilerpipeArticleExtractor
[INFO] Parameter: artifactBaseName, Value: BoilerpipeArticleExtractor
[INFO] Parameter: version, Value: 1.7.0.3.2.0.0-520
[INFO] Parameter: groupId, Value: com.github.paulvid
[INFO] Parameter: artifactId, Value: boilerpipe-article-extractor
[INFO] Parameter: nifiVersion, Value: 1.7.1
[INFO] Project created from Archetype in dir: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:38 min
[INFO] Finished at: 2018-08-23T10:36:04-04:00
[INFO] ------------------------------------------------------------------------
Few important points in the configuration of the archetype:
Maven has a ton of archetypes, filter by using "nifi" and then select 1: remote -> org.apache.nifi:nifi-processor-bundle-archetype (-)
Choose the version that works with your environment (1.7.1 and 1.7.0 would work in my case)
Properties don't really matter for this tutorial but here is what I used:
Define value for property 'groupId': com.github.paulvid
Define value for property 'artifactId': boilerpipe-article-extractor
Define value for property 'version' 1.0-SNAPSHOT: : 1.7.0.3.2.0.0-520
Define value for property 'artifactBaseName': BoilerpipeArticleExtractor
Define value for property 'package' com.github.paulvid.processors.BoilerpipeArticleExtractor
Step 3: Configure your processor
Once the archetype is successfully built, use your favorite IDE and open the folder containing the code.
Adding the BoilerPipe dependency
Open the file [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/pom.xml
Add the following dependency:
Rename the processor
By default, the processor under [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/src/main/java/com/github/paulvid/processors/BoilerpipeArticleExtractor is called MyProcessor. Using your IDE, rename your Processor and the corresponding test class to BoilerpipeArticleExtractor
Add code to processor
Modify your BoilerpipeArticleExtractor.java to contain the following
package com.github.paulvid.processors.BoilerpipeArticleExtractor;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.ArticleExtractor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.expression.ExpressionLanguageScope;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class BoilerpipeArticleExtractor extends AbstractProcessor {
public static final PropertyDescriptor URL_PROPERTY = new PropertyDescriptor
.Builder().name("URL")
.displayName("URL")
.description("URL of the Article to Extract")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the article has been successfully extracted")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if an error occurred during the article extraction")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(URL_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
java.net.URL articleURL = new URL(String.valueOf(context.getProperty(URL_PROPERTY).evaluateAttributeExpressions(flowFile).getValue()));
String text = ArticleExtractor.INSTANCE.getText(articleURL);
flowFile = session.write(flowFile, out -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(text.getBytes(StandardCharsets.UTF_8));
}
});
session.transfer(flowFile, REL_SUCCESS);
} catch (MalformedURLException mue) {
getLogger().error("Failed to get URL {} for {} due to {}", new Object[]{context.getProperty(URL_PROPERTY).evaluateAttributeExpressions(flowFile).getValue(), flowFile, mue});
session.transfer(flowFile, REL_FAILURE);
return;
} catch (BoilerpipeProcessingException bpe) {
getLogger().error("Failed to extract article for {} due to {}", new Object[]{flowFile, bpe});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
Create test case
Modify your BoilerpipeArticleExtractor.java to contain the following code:
package com.github.paulvid.processors.BoilerpipeArticleExtractor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class BoilerpipeArticleExtractorTest {
private TestRunner testRunner;
@Before
public void init() {
testRunner = TestRunners.newTestRunner(BoilerpipeArticleExtractor.class);
}
@Test
public void testProcessor() {
testRunner.setValidateExpressionUsage(false);
testRunner.setProperty(BoilerpipeArticleExtractor.URL_PROPERTY,"https://www.nytimes.com/reuters/2018/08/17/business/17reuters-usa-tunnels-china.html");
testRunner.enqueue("Mock FlowFile");
testRunner.run();
testRunner.assertTransferCount(BoilerpipeArticleExtractor.REL_SUCCESS, 1);
testRunner.assertTransferCount(BoilerpipeArticleExtractor.REL_FAILURE, 0);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(BoilerpipeArticleExtractor.REL_SUCCESS);
flowFiles.get(0).assertContentEquals("Musk's Tunnel-Boring Firm Seeks U.S. Tariff Exemption for Chinese Parts\n" +
"By Reuters\n" +
"Aug. 17, 2018\n" +
"WASHINGTON — A company owned by Elon Musk that is trying to lower the cost of building high-speed transit tunnels has asked the Trump administration to exempt it from tariffs for some Chinese-made tunnel boring machine components, warning the tariffs could significantly delay a planned tunnel between New York and Washington.\n" +
"In a July 31 letter posted last week on a government website, the Boring Co asked the U.S. Trade Representative to exempt parts like cutterheads, screw conveyors and related machinery. Boring seeks \"limited parts from China in the near-term for use in a small number of tunnel boring machines.\" The letter added those parts are \"readily available only from China.\"\n" +
"Privately held Boring added that it is \"working to develop and manufacture our own tunnel boring machines\" and wants to \"restore the now-dormant American tunnel boring machine industry.\"\n" +
"The company said for planned tunnels, including a project between Washington and Baltimore, it will \"use machines that are majority-composed of U.S. content.\"\n" +
"The tariffs could cause \"severe economic harm\" to the company and U.S. interests and could result in a delay of one to two years in the construction of a proposed Washington-to-Baltimore tunnel that it plans to eventually extend to New York.\n" +
"Exempting the parts will not harm U.S. industry, the company said, and noted that tunneling is not one of 10 sectors identified in China's \"Made in 2025\" plan. The company said its business model is \"predicated upon substantially reducing the cost of tunneling.\"\n" +
"Musk, who is also chief executive of Tesla Inc, in June proposed building a $1 billion underground transit system in Chicago. The plan would send people from Chicago’s downtown Loop district to O’Hare International Airport at 150 miles (241 km) per hour.\n" +
"The Boring Co has been promoting its plans for tunnels that would allow high-speed travel between cities. The company initially plans to ferry passengers between Washington and Baltimore on autonomous electric vehicles carrying 8 to 16 passengers at 125-150 miles per hour, but would not use tracks or railway equipment.\n" +
"On Wednesday, the company proposed to build a 3.6-mile tunnel between Dodger Stadium in Los Angeles and the city's subway system.\n" +
"The U.S. Trade Representative's Office and Boring did not immediately respond to requests for comment.\n" +
"Boring said in an earlier letter it was converting from diesel-powered to electric-powered construction equipment and had innovated in \"concrete mixing, segment production, excavation and hauling practices.\"\n" +
"Other companies including General Motors Co have sought exemptions from new U.S. tariffs imposed on Chinese imports.\n" +
"(Reporting by David Shepardson; Editing by David Gregorio)\n" +
"Advertisement\n");
}
}
Compile your processor and create a nar
In your command line, go to [YOUR_PATH]/tutorial/boilerpipe-article-extractor/ and type the following:
$ mvn install
You should see an output like this:
$ mvn install
[INFO] Scanning for projects...
[INFO] Inspecting build with total of 3 modules...
[INFO] Installing Nexus Staging features:
[INFO] ... total of 3 executions of maven-deploy-plugin replaced with nexus-staging-maven-plugin
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO]
[INFO] boilerpipe-article-extractor [pom]
[INFO] nifi-BoilerpipeArticleExtractor-processors [jar]
[INFO] nifi-BoilerpipeArticleExtractor-nar [nar]
[INFO]
[INFO] ----------< com.github.paulvid:boilerpipe-article-extractor >-----------
[INFO] Building boilerpipe-article-extractor 1.7.0.3.2.0.0-520 [1/3]
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ boilerpipe-article-extractor ---
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ boilerpipe-article-extractor ---
[INFO]
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ boilerpipe-article-extractor ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ boilerpipe-article-extractor ---
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ boilerpipe-article-extractor ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ boilerpipe-article-extractor ---
[INFO] No site descriptor found: nothing to attach.
[INFO]
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ boilerpipe-article-extractor ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/boilerpipe-article-extractor/1.7.0.3.2.0.0-520/boilerpipe-article-extractor-1.7.0.3.2.0.0-520.pom
[INFO]
[INFO] ---< com.github.paulvid:nifi-BoilerpipeArticleExtractor-processors >----
[INFO] Building nifi-BoilerpipeArticleExtractor-processors 1.7.0.3.2.0.0-520 [2/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO]
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/classes
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/src/test/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/test-classes
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO]
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.github.paulvid.processors.BoilerpipeArticleExtractor.BoilerpipeArticleExtractorTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.907 s - in com.github.paulvid.processors.BoilerpipeArticleExtractor.BoilerpipeArticleExtractorTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Building jar: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO]
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO]
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-processors/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-processors/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.pom
[INFO]
[INFO] -------< com.github.paulvid:nifi-BoilerpipeArticleExtractor-nar >-------
[INFO] Building nifi-BoilerpipeArticleExtractor-nar 1.7.0.3.2.0.0-520 [3/3]
[INFO] --------------------------------[ nar ]---------------------------------
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO]
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/src/main/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/src/test/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO]
[INFO] --- nifi-nar-maven-plugin:1.2.0:nar (default-nar) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Copying nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO] Copying nekohtml-1.9.21.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nekohtml-1.9.21.jar
[INFO] Copying xercesImpl-2.11.0.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/xercesImpl-2.11.0.jar
[INFO] Copying xml-apis-1.4.01.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/xml-apis-1.4.01.jar
[INFO] Copying nifi-utils-1.7.1.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nifi-utils-1.7.1.jar
[INFO] Copying jericho-html-3.3.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/jericho-html-3.3.jar
[INFO] Copying boilerpipe-1.2.3.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/boilerpipe-1.2.3.jar
[INFO] Building jar: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar
[INFO]
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Skipping because packaging 'nar' is not pom.
[INFO]
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-nar/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-nar/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.pom
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] boilerpipe-article-extractor 1.7.0.3.2.0.0-520 ..... SUCCESS [ 1.487 s]
[INFO] nifi-BoilerpipeArticleExtractor-processors ......... SUCCESS [ 14.513 s]
[INFO] nifi-BoilerpipeArticleExtractor-nar 1.7.0.3.2.0.0-520 SUCCESS [ 0.281 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.712 s
[INFO] Finished at: 2018-08-23T11:18:13-04:00
[INFO] ------------------------------------------------------------------------
Step 4: Upload the generated nar
The file nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar should be generated under [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target
Upload it onto your server under the library folder of nifi (typically /usr/hdf/current/nifi/lib/)
Restart NiFi and you should see the BoilerpipeArticleExtractor available
Step 5: Configure your processor
Configure the processor to use the web_url attribute as a property, as follows:
Section 3: Create a custom Mairesse processor
This section is a little advanced. I built a custom processor using this personality recognizer (http://farm2.user.srcf.net/research/personality/recognizer). If you want to checkout my
github . For this tutorial, we will only concentrate on uploading my compiled code and the dependencies necessary.
Step 1: Upload the Mairesse Processor
Download the file
nifi-MairessePersonalityRecognition-nar-1.5.0.3.1.2.0-7.nar from my github
Upload it onto your server under the library folder of nifi (typically /usr/hdf/current/nifi/lib/)
Restart NiFi and you should see the MairessePersonalityRecognition available
Step 2: Upload the library files
Upload the files under the folder
to-upload-to-nifi-lib-folder from my github to a location on your server (e.g. /home/nifi/perso-recognition/lib/)
Step 3: Configure your processor
Configure the processor as follows:
Section 4: Create and configure a Kafka topic
Step 1: Create a schema in Schema Registry
Go to your schema registry and create a new schema called personality-recognition:
The schema you will enter should be as follows:
{
"type": "record",
"name": "personalityrecognition",
"fields": [
{
"name": "web_url",
"type": "string",
"default": null
},
{
"name": "snippet",
"type": "string",
"default": null
},
{
"name": "byline",
"type": "string",
"default": null
},
{
"name": "pub_date",
"type": "string",
"default": null
},
{
"name": "headline",
"type": "string",
"default": null
},
{
"name": "document_type",
"type": "string",
"default": null
},
{
"name": "news_desk",
"type": "string",
"default": null
},
{
"name": "timestamp",
"type": "string",
"default": null
},
{
"name": "extraversion",
"type": "string",
"default": null
},
{
"name": "emotional_stability",
"type": "string",
"default": null
},
{
"name": "agreeableness",
"type": "string",
"default": null
},
{
"name": "conscientiousness",
"type": "string",
"default": null
},
{
"name": "openness_to_experience",
"type": "string",
"default": null
}
]
}
Step 2: Update attribute to add the schema name
Using an UpdateAttribute processor, configure it to use the schema.name personality-recognition, as well as removing double quotes from the headlines and snippet attributes:
Step 3: Create Json from attributes
Using an ReplaceText processor, configure the replacement value to create the appropriate JSON:
The replacement value you will enter should be as follows:
{
"web_url": "${web_url}",
"snippet": "${snippet}",
"byline": "${byline}",
"pub_date": "${pub_date}",
"headline": "${headline}",
"document_type": "${document_type}",
"news_desk": "${news_desk}",
"timestamp": "${now()}",
"extraversion": "${mairessepersonalityrecognition.extraversion}",
"emotional_stability": "${mairessepersonalityrecognition.emotional_stability}",
"agreeableness": "${mairessepersonalityrecognition.agreeableness}",
"conscientiousness": "${mairessepersonalityrecognition.conscientiousness}",
"openness_to_experience": "${mairessepersonalityrecognition.openness_to_experience}"
}
Step 4: Create Kafka topic
On your server running Kafka, find the shell scripts to create topics (typically under /usr/hdp/current/kafka-broker/bin).
Run the following command:
$ ./kafka-topics.sh --zookeeper localhost:2181 --topic personality-recognition --create --partitions 1 --replication-factor 1
Step 5: Configure PublishKafkaRecord
Configure Hortonworks Schema Registry Controller
In your controller configuration add a new Hortonworks Schema Registry controller:
The Schema registry URL should look something like this:
http://pvidal-personality-recognition0.field.hortonworks.com:7788/api/v1/
Configure JsonPathReader Controller
In your controller configuration add a new JsonPathReader Controller to use the schema name and the Hortonworks Schema Registry controller we created, as follows:
Add all the fields from the JSON output we created
Configure AvroRecordSetWriter Controller
In your controller configuration add a new AvroRecordSetWriter Controller to use the schema name and the Hortonworks Schema Registry controller we created, as follows:
Configure PublishKafkaRecord Processor
Add a new processor to your flow, configured to use the controllers we created, as follows:
The Kafka Broker URL should look something like this:
pvidal-personality-recognition0.field.hortonworks.com:6667
Step 6: Run flow and consume data via command line
On your server running Kafka, find the shell scripts to create topics (typically under /usr/hdp/current/kafka-broker/bin).
Run the following command:
$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic personality-recognition
Run your flow in NiFi and you should start seeing data flowing!
... View more
08-23-2018
02:08 PM
5 Kudos
Introduction A few weeks ago, I published an article called Determining the big 5 traits of Personality Psychology of news articles using NiFi, Hive & Zeppelin. Since then, I have worked diligently to improve on this first iteration, with the objective to mock up at the heart of every company today: create an end-to-end platform that uses machine learning to not only generate insights, but keeps improving and feeding consumer applications. While doing this work was a way for me to get into the needy-greedy of the latest Hortonworks tools, I decided to share it with the world in the form of a series of tutorial articles, because I believe it is a great way to get familiar with the stack. Architecture overview Luckily, the Hortonworks platform has all the elements needed to create this end-to-end platform. The figure below gives an overview of this series of articles architecture: As you can see, the goal of this platform is to:
Ingest data from news articles (directly from the NYT API at first, then from other RSS feeds) Using Nifi and SAM, read the meta-data of the extracted articles, scrape their content, run personality recognition on their authors, then expose the result via Kafka for Druid consumption, directly pushing to HBase/Phoenix for offline analytics and "micro" services for consumer applications Generate real time insights on this computed data via Druid and Superset Enable Analytics & model training on the data stored in HBase using Zeppelin notebooks & Spark, that would then feed back the personality recognition modes Enable custom application to consumer the data extracted and analyzed Agenda This series of article will be composed of 4 parts:
Part 1: Create Nifi Flow with Custom Processors Part 2: Using SAM to consume Kafka, load into Druid and create superset visualization Part 3: Using Spark and Zeppelin to run offline Analytics and train models Part 4: Create Nifi Web Services and a consumer application
... View more
- « Previous
- Next »