Member since
06-20-2019
8
Posts
11
Kudos Received
0
Solutions
03-25-2020
06:58 AM
Hi Andre, For detailed checking disk performance you find here appropriate testing procedures: https://docs.cloudera.com/documentation/other/reference-architecture/PDF/cloudera_ref_arch_stg_dev_accept_criteria.pdf Further good benchmark are here the TCP-DS tests for Impala: https://github.com/cloudera/impala-tpcds-kit and for Hive TCP-DS and TCP-H: https://github.com/hortonworks/hive-testbench Also recommend to check network performance via Cloudera Manager by the function Inspect Cluster Network Performance Hope this helps, Regards Friedel
... View more
01-16-2019
12:07 PM
1 Kudo
Abstract Tensorflow-serving with Apache Hadoop 3.1 and YARN resource management. YARN manages the startup, control and destroys the Tensorflow-serving Docker container in a Hadoop cluster. Client applications using RESTful API calls to communicate with the ML application, i.e. to make predictions. Infrastructure Setup Allocate a Centos 7.4 virtual machine and install: - Docker CE Installation and start Docker - Installation of required packages: amber, curl, wget, unzip, git - Install and Download HDP Sandbox version see this Article Docker on Yarn - Sandbox wget https://github.com/SamHjelmfelt/Amber/archive/master.zip
unzip master.zip
cd Amber-master
./amber.sh createFromPrebuiltSample samples/yarnquickstart/yarnquickstart-sample.ini Output snippet: ...
Verifying ambari-agent process status...
Ambari Agent successfully started
Agent PID at: /run/ambari-agent/ambari-agent.pid
Agent out at: /var/log/ambari-agent/ambari-agent.out
Agent log at: /var/log/ambari-agent/ambari-agent.log
Waiting for agent heartbeat...
Starting all services. Visit http://localhost:8080 to view the status
Check that the installation is successful by web browser login to Ambari or http://hostname:8080 Login:admin/admin and go to YARN -> Resource Manager UI alternative checking docker state command with the CLI # docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3da7cecdb948 samhjelmfelt/amber_yarnquickstart:3.0.1.0-187 "/usr/sbin/init" 36 minutes ago Up 36 minutes 0.0.0.0:8042->8042/tcp, 0.0.0.0:8080->8080/tcp, 0.0.0.0:8088->8088/tcp resourcemanager.yarnquickstart
Download and update the tenforflow/serving Image Next we need to load the following to the tensorflow/serving image from the docker repository. see: Tensorflow Serving Docker Download the Tensorflow model i.e. from Github mkdir -p /tmp/tfserving
cd /tmp/tfserving
git clone https://github.com/tensorflow/serving
Output:
Cloning into 'serving'...
remote: Enumerating objects: 134, done.
remote: Counting objects: 100% (134/134), done.
remote: Compressing objects: 100% (71/71), done.
remote: Total 14717 (delta 93), reused 93 (delta 63), pack-reused 14583
Receiving objects: 100% (14717/14717), 4.20 MiB | 163.00 KiB/s, done.
Resolving deltas: 100% (10777/10777), done. Deploy the model: saved_model_half_plus_two_cpu with the Tensorflow Serving REST API. This model is a very simple function that the input parameter values is divided by two and then adding two i.e. input 1.0 output 2.5 math=((1.0/2)+2). docker run -p 8501:8501 --mount type=bind,source=/tmp/tfserving/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu,target=/models/half_plus_two -e MODEL_NAME=half_plus_two -t tensorflow/serving
Now we need to add a valid user and group to the running tensorflow container. We provision jobs in YARN with the ambari-aq (uid: 1002) unix user, who belongs to the unix group hadoop (gid: 1001). The uid and gui are required because YARN starts the docker container by these numbers. First identify the container (here we use the ID) docker ps output CONTAINER ID IMAGE COMMAND
1bbcb1a800a8 tensorflow/serving "/user/bin/tf_serving..."
2d01ca988f2f samhjelmfeld/amber.. "/user/sbin/int" Next add the system user, group and initial model to the tensorflow container. Then we create a new image with the docker commit docker exec -it 1bbcb1a800a8 sh -c "groupadd hadoop -g 1001"
docker exec -it 1bbcb1a800a8 sh -c "useradd ambari-qa -u 1002 -g 1001"
cd tmp/tfserving/serving/tensorflow_serving/servables/tensorflow/testdata/
docker cp saved_model_half_plus_two_cpu 1bbcb1a800a8:/models/half_plus_two
docker commit 1bbcb1a800a8 tensorflow/yarn Note that we have change the name of the image to tensorflow/yarn. Create YARN Application definition Create a YARN service definition by copy below definition into a file name i.e. tf.json {
"name": "tensorflow-serving",
"version": "1.0.0",
"description": "Tensorflow example",
"components" :
[
{
"name": "tensorflow",
"number_of_containers": 1,
"artifact": {
"id": "tensorflow/serving",
"type": "DOCKER"
},
"launch_command": "",
"resource": {
"cpus": 1,
"memory": "512"
},
"configuration": {
"env": {
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE": "true"
}
}
}
]
}
Now you can deploy the application Tensorflow Serving with YARN API. curl -X POST -H "Content-Type: application/json" http://frothkoe10.field.hortonworks.com:8088/app/v1/services?user.name=ambari-qa -d @tf.json
output {"uri":"/v1/services/tensorflow-serving","diagnostics":"Application ID: application_1547627739370_0003","state":"ACCEPTED"} If you successfully provisioned the job to YARN the return state is "ACCEPTED". It will need a few seconds to allocate the resources and start the docker container. See the application Tensorflow Serving is running with the YARN UI http://nodename:8088/ui2/#/yarn-apps/apps (check column Application Name): More details of the YARN application you get with the CLI command and REST API curl http://localhost:8088/app/v1/services/tensorflow-serving?user.name=ambari-qa | python -m json.tool
output {
"components": [
{
"artifact": {
"id": "tensorflow/yarn",
"type": "DOCKER"
},
"configuration": {
"env": {
"MODEL_BASE_PATH": "/models",
"MODEL_NAME": "half_plus_two",
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE": "true"
},
"files": [],
"properties": {}
},
"containers": [
{
"bare_host": "resourcemanager.yarnquickstart",
"component_instance_name": "tensorflow-0",
"hostname": "tensorflow-0.tensorflow-serving.ambari-qa.EXAMPLE.COM",
"id": "container_e09_1548104697693_0003_01_000002",
"ip": "172.17.0.3",
"launch_time": 1548165579806,
"state": "READY"
}
],
"dependencies": [],
"launch_command": "/usr/bin/tf_serving_entrypoint.sh",
"name": "tensorflow",
"number_of_containers": 1,
"quicklinks": [],
"resource": {
"additional": {},
"cpus": 1,
"memory": "256"
},
"restart_policy": "ALWAYS",
"run_privileged_container": false,
"state": "STABLE"
}
],
"configuration": {
"env": {},
"files": [],
"properties": {}
},
"description": "Tensorflow example",
"id": "application_1548104697693_0003",
"kerberos_principal": {},
"lifetime": -1,
"name": "tensorflow-serving",
"quicklinks": {},
"state": "STABLE",
"version": "1.0.0"
}
Add HTTP Proxy Because the YARN containers not expose ports to external network we add a http proxy service, i.e. nginx. mkdir proxy
cd proxy
mkdir conf.d
To configure the proxy service we need two configuration files nginx.conf This file in the proxy directory. user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
client_max_body_size 200m;
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
proxy_buffering on;
proxy_buffer_size 128k;
proxy_buffers 100 128k;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}
conf.d/http.conf In this file you have evtl. update the ip address of the previous started container section "ip": "172.17.0.3" in the output or the YARN REST API. server {
listen 8500;
location / {
proxy_pass http://172.17.0.3;
}
}
server {
listen 8501;
location / {
proxy_pass http://172.17.0.3;
}
}
Now we can run the nginx docker container with this configuration (here we use hortonworks/sanbox-proxy image). The -v options mount the two above configuration files to the container. docker run --name yarn.proxy --network=amber -v /home/centos/Amber-master/proxy/nginx.conf:/etc/nginx/nginx.conf -v /home/centos/Amber-master/proxy/conf.d:/etc/nginx/conf.d -p 8500:8500 -p 8501:8501 -d hortonworks/sandbox-proxy:1.0
This docker command will download the image and start proxy incoming http request on the port 8500/8501 to the docker container tensorflow/yarn. Information: yarn.nodemanager.runtime.linux.docker.default-container-network = amber This is the docker network where the Tensorflow containers running and it is important to understand that the http proxy is in the same docker network as the tensorflow containers. You can view or change that in the Ambari YARN advanced configuration. Check that Tensorflow Model is deployed and accessible Let's check that it works end-to-end and run the cmd from a remote client. curl http://hostname_or_ip:8501/v1/models/half_plus_two
output: {
"model_version_status": [
{
"version": "123",
"state": "AVAILABLE",
"status": {
"error_code": "OK",
"error_message": ""
}
}
]
}
The state should be "AVAILABLE" Use the Tensorflow Model You can now start using the tensorflow applicaiton i.e. with Crome Postman UI or with a command line interfaces. curl -d '{"instances": [1.0, 2.0, 5.0]}' \ -X POST http:/hostname:8501/v1/models/half_plus_two:predict
{ "predictions": [2.5, 3.0, 4.5] }
Destroy the YARN Application curl -X DELETE http://localhost:8088/app/v1/services/tensorflow/serving?user.name=ambari-qa
... View more
Labels:
01-10-2019
12:00 PM
3 Kudos
Abstract Machine learning and supervised learning at scale is a common task that can be addressed with Apache Hive by leveraging Hivemall features and user-defined functions (UDFs) for HiveQL, which is strongly optimized for machine learning (ML) and data science. For example, logistic regression models, one of the basic tenants of ML, can also be easily built with Hivemall. The following example should explain the basic usage of Hivemall with examples of supervised learning of a simple regressor. Setup the environment -> HDP 3.0.1 Sandbox - Hivemall 0.5.2 see Machine Learning with Apache Hive SQL using Hivemall Explore the Training Data Database: foodmart - Table: sales_fact _dec_1998 and customer Create Table customer_training as
select s1.customer_id, c1.country, c1.gender, c1.member_card,c1.num_cars_owned, count(s1.product_id) as purchase_cnt
from sales_fact_dec_1998 s1
join customer c1
WHERE s1.customer_id = c1.customer_id
group by s1.customer_id, c1.country, c1.gender, c1.member_card, c1.num_cars_owned;
customer_id
country
gender member_card purchase_cnt
14
USA
F
Bronze
3
34
USA
F
Silver
14
39
USA
F
Bronze
6
48
Mexico
M
Bronze
8
55
USA
M
Normal
7
57
Mexico
M
Golden
8
74
USA
M
Bronze
6
76
USA
F
Normal
10
80
USA
F
Bronze
7
84
Canada
M
Bronze
1
87
Canada
F
Bronze
7
93
USA
M
Golden
8
98
Canada
F
Bronze
2
Customer Feature Representation Create the feature vector for the customer that relevant i.e. country, gender, member_card and num_cars_owned. Hivemall can have quantitive (mumeric) features like num_cars_owned or categorical (char/strings) as gender or country. create table customer_features (customer_id int, features array<string>, purchase_cnt int);
insert overwrite table customer_features
select
customer_id,
hivemall.array_concat(
hivemall.quantitative_features( array("num_cars_owned"),num_cars_owned ) ,
hivemall.categorical_features( array("country", "gender","member_card"), country, gender, member_card )
) as features,
sum(purchase_cnt)
from
customer_training
group by customer_id,
hivemall.array_concat(
hivemall.quantitative_features( array("num_cars_owned"),num_cars_owned ) ,
hivemall.categorical_features( array("country", "gender","member_card"), country, gender, member_card )
)
;
The customer_features should look like the following: customer_id features
purchase_cnt
6
["num_cars_owned:4.0","country#USA","gender#F","member_card#Bronze"]
4
10
["num_cars_owned:5.0","country#USA","gender#M","member_card#Golden"]
5
12
["num_cars_owned:2.0","country#USA","gender#F","member_card#Bronze"]
2
14
["num_cars_owned:3.0","country#USA","gender#F","member_card#Bronze"]
3
24
["num_cars_owned:3.0","country#Mexico","gender#F","member_card#Bronze"]
3
34
["num_cars_owned:14.0","country#USA","gender#F","member_card#Silver"]
14
36
["num_cars_owned:6.0","country#USA","gender#M","member_card#Normal"]
6
39
["num_cars_owned:6.0","country#USA","gender#F","member_card#Bronze"]
6
42
["num_cars_owned:7.0","country#Mexico","gender#F","member_card#Golden"]
7
48
["num_cars_owned:8.0","country#Mexico","gender#M","member_card#Bronze"]
8
52
["num_cars_owned:5.0","country#Mexico","gender#F","member_card#Silver"]
5
55
["num_cars_owned:7.0","country#USA","gender#M","member_card#Normal"]
7
Training Now that we have the customers with a defined set of features, the next step is to create the regressor. This requires us to specify an appropriate -loss_function. As this is squared for train_regressor, this means this query builds a simple linear regressor with the squared loss. create table if not exists customer_regressor as
select
hivemall.train_regressor(
features, -- feature vector
purchase_cnt, -- target value
'-loss_function squared -optimizer AdaGrad -regularization l2' -- hyper-parameters
) as (feature, weight)
from
customer_features
; Table: Customer_Regression table
feature weight
country#Canada
0.036762696
country#Mexico
-0.07290672
country#USA
0.009568159
gender#F
-0.035828743
gender#M
-0.052134056
member_card#Bronze
-0.015939986
member_card#Golden
0.0020267756
member_card#Normal
-0.04439145
member_card#Silver
-0.059466142
num_cars_owned
1.0083996
Prediction Now we can compute the prediction for the number of purchases of a new customer. create table if not exists customer_new as
select 1 as id, array("num_cars_owned:3.0","country#USA","gender#F","member_card#Silver") as features
;
with features_exploded as (
select
id,
hivemall.extract_feature( fv ) as feature,
hivemall.extract_weight( fv ) as value
from customer_new t1 LATERAL VIEW explode(features) t2 as fv
)
select
t1.id,
sum(p1.weight * t1.value) as predicted_num_purchases
from
features_exploded t1
LEFT OUTER JOIN customer_regressor p1 ON (t1.feature = p1.feature)
group by
t1.id
; PREDICTED_NUM_PURCHASES
2.9394720904529095
... View more
Labels:
01-07-2019
07:27 PM
2 Kudos
Abstract
With native SQL Commands and the publically available Hivemall components we are able to build an advanced analytics and machine learning process for a recommendation system. Setup the Environment We will setup this environment with the latest version of the Hortonworks Sandbox and the Hivemall Repo as shown below to prepare our infrastructure for eg some relatively complicated Machine Learning concepts with a relatively simple environment. Step 1- Download and install Hortonworks Sandbox HDP 3.0.1
Step 2 - Download the Hivemall artefacts Hivemall Repo and copy into your HDFS directory (so all nodes in the cluster have access to the same) login as user: hive with the su -hive command wget https://search.maven.org/remotecontent?filepath=org/apache/hivemall/hivemall-all/0.5.2-incubating/hivemall-all-0.5.2-incubating.jar -O hivemall-all-0.5.2-incubating.jar
hdfs dfs -mkdir /apps/hive/warehouse/lib
hdfs dfs -put hivemall-all-0.5.2-incubating.jar /apps/hive/warehouse/lib
Step 3A - Download DLL script from Github wget https://raw.githubusercontent.com/apache/incubator-hivemall/v0.5.2/resources/ddl/define-all-as-permanent.hive -O define-all-as-permanent.hive Step 3B - Edit the first lines in the script by uncommenting the lines shown below and modifying the HDFS path: vi define-all-as-permanent.hive uncomment (remove -- ) and modify/replace the following lines: CREATE DATABASE IF NOT EXISTS hivemall; USE hivemall; set hivevar:hivemall_jar=hdfs:///apps/hive/warehouse/lib/hivemall-all-0.5.2-incubating.jar;
Step 4 - run the hive script define-all-as-permanent.hive to create the functions and create the Hivemall functions permanently beeline -u "jdbc:hive2://sandbox-hdp.hortonworks.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n hive --force=true -i define-all-as-permanent.hive check that Hivemall function are available and lookup the version (don't worry if not all of the functions may have compiled, the ones necessary for this task should be there) select hivemall.hivemall_version();
+-------------------+
|0.5.2-incubating |
+-------------------+ Congratulations, now you ready to do the interesting part. Building the Collaborative Filtering Recommendation System This demo recommendation system is based on the two hypotheses:
co occurrence: customer repeatedly buying products that they have also bought previously similarity: products bought with products together (think Amazon's "these products are often bought with") The algorithms used are Item-based Collaborative Filtering and Naive similarity computation O(n^2) to compute all item-item pair similarity. In order to accelerate the procedure, Hivemall has an efficient scheme for computing Jaccard and/or cosine similarity. We need to understand the available data and table structure in the foodmart database, and the two fact tables: sales_fact_1997 and sales_fact_1998 (for A/B testing) are necessary data for our recommendation system: customers (customer_id) products (product_id) purchased quantity (count) purchase date (time_id) Table: sales_fact_1997
product_id
time_id
customer_id
promotion_id
store_id
store_sales
store_cost
unit_sales
1318
483
3
1141
15
2.7900
0.8928
3.0000
970
697
3
0
15
9.3300
3.0789
3.0000
393
661
3
0
15
3.3600
1.4112
3.0000
885
483
3
1141
15
2.7600
1.0764
2.0000
1534
483
3
1141
15
5.5600
1.7792
2.0000
286
483
3
1141
15
4.2600
1.5336
3.0000
736
661
3
0
15
6.3900
3.0033
3.0000
1379
697
3
0
15
5.7600
2.8800
3.0000
130
483
3
1141
15
4.4600
1.3826
2.0000
1252
483
3
1141
15
6.8400
2.0520
3.0000
In the sandbox hive database the volume in these two tables is small only 86K rows and synthetically generated data, but good enough for demo purposes. Change to the demo database foodmart. use foodmart; First we need to construct the customer_purchased table from the sales_fact_1997 table with the following parameters customer_purchased (customer_id, product_id, purchased_at, purchase_count) Aggregate the total count of purchased products for every customer that year. CREATE TABLE customer_purchased as
select
customer_id,
product_id,
max(time_id) as purchased_at,
count(1) as purchase_count
from
sales_fact_1997
group by
customer_id,
product_id
; The elapsed runtime: 12 sec Here is a snippet of what the expected customer_purchased table is expected to look like: TABLE customer_purchased customer_id product_id purchased_at purchase_count 157 1 720 1 456 1 534 1 916 1 463 1 1312 1 489 1 2270 1 698 1 3441 1 590 1 Compute top-k recently purchased items for each user Next we create the table with the top-k recently purchased items for each user like so: create table recently_purchased_products ( rank int, purchased_at int, customer_id int, product_id int); Here below is a brief explanation of the SQL commands used for this table creation: -- where [optional filtering]
-- purchased_at >= xxx -- divide training/test data by time -- hivemall.each_top_k 5 --> get top-5 recently purchased items for each user -- Note CLUSTER BY is mandatory when using each_top_k INSERT OVERWRITE TABLE recently_purchased_products
SELECT
hivemall.each_top_k( 5, customer_id, purchased_at, customer_id, product_id
) as (rank, purchased_at, customer_id, product_id)
FROM (
SELECT
purchased_at,customer_id, product_id
FROM
customer_purchased
CLUSTER BY
customer_id
) t; The elapsed runtime: 12 sec Here is a snippet of what the expected recently_purchased_products table is expected to look like:
customer_id
rank
product_id
purchased_at
3
1
1379
697
3
1
204
697
3
1
1359
697
3
1
970
697
3
2
393
661
5
1
384
370
6
1
958
570
6
1
935
570
6
1
568
570
6
1
53
570
6
1
1417
570
10
1
1278
704
10
1
568
704
10
2
60
424
10
2
443
424
10
2
454
424
Cooccurrence-based Recommendation In order to generate a list of recommended items, you can use either cooccurrence count or similarity as a relevance score. First, we look into the cooccurrence base method. From the previous created table customer_puchased we create the table:
cooccurrence (product_id , other "product_id", count) Group the "other" products that the same customer has purchased together. Basically two product_id's with count. CREATE TABLE cooccurrence AS
SELECT
u1.product_id,
u2.product_id as other,
count(1) as cnt
FROM
customer_purchased u1
JOIN customer_purchased u2 ON (u1.customer_id = u2.customer_id)
WHERE
u1.product_id != u2.product_id
AND u2.purchased_at >= u1.purchased_at
group by
u1.product_id, u2.product_id
having
cnt >= 2 -- count(1) >= 2
;
-- optional but recommended to have this condition where dataset is large cnt >= 2 -- count(1) >= 2 The elapsed runtime: 17 sec A snippet of the data in the table cooccurrence: TABLE cooccurrence product_id other "product_id" count 1 30 3 1 42 3 1 68 3 1 74 2 1 90 2 CREATE TABLE product_recommendation (customer_id int, rec_product array<int>); From the above tables recently_purchase_products and cooccurrence we now select maximum the top five products ranked by count field as recommended products. WITH topk as (
select
hivemall.each_top_k(
5, customer_id, cnt,
customer_id, other
) as (rank, cnt, customer_id, rec_item)
from (
select
t1.customer_id, t2.other, max(t2.cnt) as cnt
from
recently_purchased_products t1
JOIN cooccurrence t2 ON (t1.product_id = t2.product_id)
where
t1.product_id != t2.other
AND NOT EXISTS (
SELECT a.product_id FROM customer_purchased a
WHERE a.customer_id = t1.customer_id AND a.product_id = t2.other
AND a.purchase_count <= 1
)
group by
t1.customer_id, t2.other
CLUSTER BY
customer_id
) t1
)
INSERT OVERWRITE TABLE product_recommendation
select
customer_id,
map_values(hivemall.to_ordered_map(rank, rec_item)) as rec_items
from
topk
group by
customer_id
; The elapsed runtime: 44 sec TABLE product_recommendation Now we have the recommended products for every customer, as shown in following snippet: customer_id
product_recommendation rec_product
3
[559,603]
5
[710,1326]
6
[202,623]
10
[1443]
14
[1333,1399]
17
[1420,1169,420]
19
[948,447,1368]
20
[1314,1374]
21
[535,1295]
23
[344,631]
Similarity-based Recommendation Alternatively, it is possible to employ a similarity based recommendation, but for that we must go through a few extra steps. First we create a set of features for every product and apply the function hivemall consine_similarity to these product feature-pairs. The products purchased together from the table co occurrence are concatenated into a column feature_vector. CREATE TABLE product_features (product_id int, feature_vector ARRAY<string>);
INSERT OVERWRITE TABLE product_features
SELECT
product_id,
collect_list(hivemall.feature(other, cnt) ) as feature_vector
FROM
cooccurrence
GROUP BY
product_id; Optional Tip: Adjust scaling `ln(cnt+1)` to avoid large value in the feature vector Elapsed runtime: 15 sec TABLE product_features product_id feature_vector : product_id:count, ... 1 ["42:2","68:2","74:2","130:2","181:2","185:2" ... more 2 ["6:2","12:2","15:2","36:2","58:2","62:2","81:2" ... more 3 ["16:2","21:3","43:2","53:2","62:2","70:2","76:3" ... more Basically, the next step is to compute the Cosine similarity for two products of their feature vectors. The computation required a significant amount of time (approx. 11 minutes). CREATE TABLE product_similarity (product_id int, other in, similarity double);
WITH similarity as (
select
t1.product_id,
t2.product_id as other,
hivemall.cosine_similarity(t1.feature_vector, t2.feature_vector) as similarity
FROM
product_features t1
CROSS JOIN product_features t2
WHERE
t1.product_id != t2.product_id
),
topk as (
SELECT
hivemall.each_top_k(3, product_id, similarity,product_id, other)
as (rank, similarity, product_id, other)
FROM (select * from similarity where similarity > 0 CLUSTER BY product_id) t)
INSERT OVERWRITE TABLE product_similarity
SELECT product_id, other, similarity
FROM topk; Elapsed runtime: 674.603 sec TABLE product_similarity product_id
product_similarity other
similarity
1
201
0.19589653611183167
1
567
0.1647367626428604
1
644
0.1627947986125946
2
505
0.30411943793296814
2
455
0.29358285665512085
2
1360
0.2841376066207886
3
962
0.2816478908061981
3
1428
0.2689010500907898
3
629
0.2549935281276703
4
609
0.25717243552207947
4
620
0.24852953851222992
4
597
0.24722814559936523
5
548
0.2888334095478058
5
115
0.26622357964515686
5
481
0.26322928071022034
Final step is create the product_recommendation base on similarity that is calculate by hivemall.cosine_similarity. WITH topk as (
select
hivemall.each_top_k(
5, customer_id, similarity,
customer_id, other
) as (rank, similarity, customer_id, rec_item)
from (
select
t1.customer_id, t2.other, max(t2.similarity) as similarity
from
recently_purchased_products t1
JOIN product_similarity t2 ON (t1.product_id = t2.product_id)
where
t1.product_id != t2.other -- do not include items that user already purchased
AND NOT EXISTS (
SELECT a.product_id FROM customer_purchased a
WHERE a.customer_id = t1.customer_id AND a.product_id = t2.other
AND a.purchase_count <= 1 -- optional
)
group by
t1.customer_id, t2.other
CLUSTER BY
customer_id -- top-k grouping by userid
) t1
)
INSERT OVERWRITE TABLE product_recommendation
select
customer_id,
map_values(hivemall.to_ordered_map(rank, rec_item)) as rec_items
from
topk
group by
customer_id
;
Elapsed runtime: 20 sec
customer_id
product_recommendation.rec_product
3
[376,1392,526,660,1336]
5
[544,883,1443]
6
[984,1501,66,358,1368]
10
[121,1546,984,1103,603]
14
[794,428,404,418,1416]
17
[334,111,24,1169,337]
19
[1202,447,1295,979,1152]
20
[968,43,432,952,1428]
21
[895,1295,1467]
23
[344,11,1423,1176,408]
... View more
Labels:
12-21-2018
01:57 PM
3 Kudos
With Stochastic Streaming Algorithms it's possible to analyze highly efficient and performant massive amounts of data. The main characteristics of sketches are very small size (sub-lineare space consumption), single-pass, mergeable and deliver approximate results in proven error bounds. If an approximate result is acceptable, then it is possible that data sketches are orders-of-magnitude faster compared to classic indexing or aggregation techniques. (I don't go into the mathematical discussion, more details here DataSketches) The following example shows how to use Data Sketches within Hive (also possible with Druid): Download and install Hortonworks Sandbox (it comes with demo database: foodmart) Download Java artifact sketches-hive-0.11.0-with-shaded-core.jar (provided by Yahoo) here jar repo and copy the jar file into HDFS /apps/hive/warehouse/lib directory #Login as hive user hive@sandbox-hdp
wget https://search.maven.org/remotecontent?filepath=com/yahoo/datasketches/sketches-hive/0.11.0/sketches-hive-0.11.0-with-shaded-core.jar -O sketches-hive-0.11.0-with-shaded-core.jar
hdfs dfs -mkdir /apps/hive/warehouse/lib
hdfs dfs -put sketches-hive-0.11.0-with-shaded-core.jar /apps/hive/warehouse/lib/ Create functions (here permanently) to create and manage the sketches with CLI beeline or Data Analytics Studio (DAS) beeline -u "jdbc:hive2://sandbox-hdp.hortonworks.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n hive --force=true set hivevar:hivesketch_jar=hdfs:///apps/hive/warehouse/lib/sketches-hive-0.11.0-with-shaded-core.jar;
create database sketches;
use sketches;
create function qtl_data2sketch as 'com.yahoo.sketches.hive.quantiles.DataToDoublesSketchUDAF' USING JAR '${hivesketch_jar}' ;
create function qtl_getQuantile as 'com.yahoo.sketches.hive.quantiles.GetQuantileFromDoublesSketchUDF' USING JAR '${hivesketch_jar}'; Create a data sketch for product and price with the function qtl_data2sketch. The table product_sketch has two columns product_id that contains the products sold and a secondary binary field for the data sketch itself. use foodmart;
create table product_sketch (product_id double, qtl_sketch binary);
insert into product_sketch select cast(product_id as double), sketches.qtl_data2sketch(cast(store_sales as double)) from sales_fact_1997 group by product_id; The table product_sketch stores for every product the price sketch of the year (1997). The data sketch of the product can now retrieved i.e. to get a quantile of the data below 25%. select product_id, sketches.qtl_getQuantile(qtl_sketch, 0.25) from product_sketch limit 3; Product 25% Quantile 1 8.55 2 1.48 3 2.49 Use in a more advanced example i.e. to find sales where product revenue was below the 25% quantile. select sales.product_id, sum(store_sales * unit_sales ) sum_sales, sum( unit_sales) units, store_sales, sketches.qtl_getQuantile(qtl_sketch, 0.25) 25pct
from sales_fact_1997 sales
join product_sketch
on (product_sketch.product_id = sales.product_id
and store_sales < sketches.qtl_getQuantile(qtl_sketch, 0.25))
group by sales.product_id , store_sales, sketches.qtl_getQuantile(qtl_sketch, 0.25)
order by sum_sales desc
limit 10; sales.product_id sum_sales units store_sales 25pct 11 252.72 36 7.02 10.53 390 239.36 32 7.48 11.22 362 222.32 28 7.94 11.91 Table shows the top 3 products and revenue where the price is lower 25 % quantile of the price span (per product).
... View more
Labels:
10-17-2018
02:35 PM
1 Kudo
Thanks, very helpful.
... View more
04-20-2018
09:41 AM
To use DPS services, i.e. DLM, for replication between HDP clusters over wired encryptions leverages the Knox gateway functionality. For that two services must be configured in Knox and one Ranger Policy created. This is the quick setup of the required Ambari services in Knox and Ranger. Adding two new services (AMBARI and AMBARIUI) to the Knox advance topology configuration <service>
<role>AMBARI</role>
<url>http://knox-gw:8080</url>
</service>
<service>
<role>AMBARIUI</role>
<url>http://knox-gw:8080</url>
</service> In Ranger define a policy to allow the access to AmbariUI Then restart the all marked service in the cluster. Test that it works and login to Ambari by proxying Knox (default port: 8443). Now you can join the cluster in DPS.
... View more
Labels:
01-26-2018
02:33 PM
1 Kudo
Prior Ranger Version 0.6 it was a common scenario that security admins created per users policies, e.g. grant a user access to project databases (hive) or special HDFS directories. Initially, a policy was created for each user so the admins ended up to manage about thousands of policies. Especially HDFS policies often had the name in the path, e.g. /user/demo/USERNAME/ *. As Ranger 0.6 introduces the user variable, now the security admin have only to create one policy that looks like this in the path: /user/demo/{USER}/ * and assign the appropriate user permissions for the directory. The user can immediate access and use the new HDFS directory with enforced user permissions. dummy_1$ hdfs dfs -copyFromLocal tst.x1 /user/demo/dummy_1
dummy_1$ hdfs dfs -ls /usr/demo/dummy_1
Found 1 items
-rw-r--r-- 3 dummy_1 hdfs19001 2018-03-23 15:36 /user/demo/dummy_1/text.txt With the use screen-shot-2018-03-23-at-165124.pngof user variables now the security administrators can create more dynamic policies that dramatically reduced the amount of policies in the environment.
... View more
Labels: