Created on 01-07-2019 07:27 PM
With native SQL Commands and the publically available Hivemall components we are able to build an advanced analytics and machine learning process for a recommendation system.
We will setup this environment with the latest version of the Hortonworks Sandbox and the Hivemall Repo as shown below to prepare our infrastructure for eg some relatively complicated Machine Learning concepts with a relatively simple environment.
Step 1- Download and install Hortonworks Sandbox HDP 3.0.1
Step 2 - Download the Hivemall artefacts Hivemall Repo and copy into your HDFS directory (so all nodes in the cluster have access to the same)
login as user: hive with the su -hive command
wget https://search.maven.org/remotecontent?filepath=org/apache/hivemall/hivemall-all/0.5.2-incubating/hi... -O hivemall-all-0.5.2-incubating.jar hdfs dfs -mkdir /apps/hive/warehouse/lib hdfs dfs -put hivemall-all-0.5.2-incubating.jar /apps/hive/warehouse/lib
Step 3A - Download DLL script from Github
wget https://raw.githubusercontent.com/apache/incubator-hivemall/v0.5.2/resources/ddl/define-all-as-perma... -O define-all-as-permanent.hive
Step 3B - Edit the first lines in the script by uncommenting the lines shown below and modifying the HDFS path:
vi define-all-as-permanent.hive uncomment (remove -- ) and modify/replace the following lines:
CREATE DATABASE IF NOT EXISTS hivemall;
USE hivemall;
set hivevar:hivemall_jar=hdfs:///apps/hive/warehouse/lib/hivemall-all-0.5.2-incubating.jar;
Step 4 - run the hive script define-all-as-permanent.hive to create the functions and create the Hivemall functions permanently
beeline -u "jdbc:hive2://sandbox-hdp.hortonworks.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n hive --force=true -i define-all-as-permanent.hive
check that Hivemall function are available and lookup the version (don't worry if not all of the functions may have compiled, the ones necessary for this task should be there)
select hivemall.hivemall_version(); +-------------------+ |0.5.2-incubating | +-------------------+
Congratulations, now you ready to do the interesting part.
This demo recommendation system is based on the two hypotheses:
The algorithms used are Item-based Collaborative Filtering and Naive similarity computation O(n^2)
to compute all item-item pair similarity. In order to accelerate the procedure, Hivemall has an efficient scheme for computing Jaccard and/or cosine similarity.
We need to understand the available data and table structure in the foodmart database, and the two fact tables: sales_fact_1997 and sales_fact_1998 (for A/B testing) are necessary data for our recommendation system:
product_id | time_id | customer_id | promotion_id | store_id | store_sales | store_cost | unit_sales |
1318 | 483 | 3 | 1141 | 15 | 2.7900 | 0.8928 | 3.0000 |
970 | 697 | 3 | 0 | 15 | 9.3300 | 3.0789 | 3.0000 |
393 | 661 | 3 | 0 | 15 | 3.3600 | 1.4112 | 3.0000 |
885 | 483 | 3 | 1141 | 15 | 2.7600 | 1.0764 | 2.0000 |
1534 | 483 | 3 | 1141 | 15 | 5.5600 | 1.7792 | 2.0000 |
286 | 483 | 3 | 1141 | 15 | 4.2600 | 1.5336 | 3.0000 |
736 | 661 | 3 | 0 | 15 | 6.3900 | 3.0033 | 3.0000 |
1379 | 697 | 3 | 0 | 15 | 5.7600 | 2.8800 | 3.0000 |
130 | 483 | 3 | 1141 | 15 | 4.4600 | 1.3826 | 2.0000 |
1252 | 483 | 3 | 1141 | 15 | 6.8400 | 2.0520 | 3.0000 |
In the sandbox hive database the volume in these two tables is small only 86K rows and synthetically generated data, but good enough for demo purposes.
Change to the demo database foodmart.
use foodmart;
First we need to construct the customer_purchased table from the sales_fact_1997 table with the following parameters
Aggregate the total count of purchased products for every customer that year.
CREATE TABLE customer_purchased as select customer_id, product_id, max(time_id) as purchased_at, count(1) as purchase_count from sales_fact_1997 group by customer_id, product_id ;
The elapsed runtime: 12 sec
Here is a snippet of what the expected customer_purchased table is expected to look like:
customer_id | product_id | purchased_at | purchase_count |
157 | 1 | 720 | 1 |
456 | 1 | 534 | 1 |
916 | 1 | 463 | 1 |
1312 | 1 | 489 | 1 |
2270 | 1 | 698 | 1 |
3441 | 1 | 590 | 1 |
Next we create the table with the top-k recently purchased items for each user like so:
create table recently_purchased_products ( rank int, purchased_at int, customer_id int, product_id int);
Here below is a brief explanation of the SQL commands used for this table creation:
-- where [optional filtering] -- purchased_at >= xxx -- divide training/test data by time
-- hivemall.each_top_k 5 --> get top-5 recently purchased items for each user
-- Note CLUSTER BY is mandatory when using each_top_k
INSERT OVERWRITE TABLE recently_purchased_products SELECT hivemall.each_top_k( 5, customer_id, purchased_at, customer_id, product_id ) as (rank, purchased_at, customer_id, product_id) FROM ( SELECT purchased_at,customer_id, product_id FROM customer_purchased CLUSTER BY customer_id ) t;
The elapsed runtime: 12 sec
Here is a snippet of what the expected recently_purchased_products table is expected to look like:
customer_id | rank | product_id | purchased_at |
3 | 1 | 1379 | 697 |
3 | 1 | 204 | 697 |
3 | 1 | 1359 | 697 |
3 | 1 | 970 | 697 |
3 | 2 | 393 | 661 |
5 | 1 | 384 | 370 |
6 | 1 | 958 | 570 |
6 | 1 | 935 | 570 |
6 | 1 | 568 | 570 |
6 | 1 | 53 | 570 |
6 | 1 | 1417 | 570 |
10 | 1 | 1278 | 704 |
10 | 1 | 568 | 704 |
10 | 2 | 60 | 424 |
10 | 2 | 443 | 424 |
10 | 2 | 454 | 424 |
In order to generate a list of recommended items, you can use either cooccurrence count or similarity as a relevance score. First, we look into the cooccurrence base method.
From the previous created table customer_puchased we create the table:
Group the "other" products that the same customer has purchased together. Basically two product_id's with count.
CREATE TABLE cooccurrence AS SELECT u1.product_id, u2.product_id as other, count(1) as cnt FROM customer_purchased u1 JOIN customer_purchased u2 ON (u1.customer_id = u2.customer_id) WHERE u1.product_id != u2.product_id AND u2.purchased_at >= u1.purchased_at group by u1.product_id, u2.product_id having cnt >= 2 -- count(1) >= 2 ;
-- optional but recommended to have this condition where dataset is large cnt >= 2 -- count(1) >= 2
The elapsed runtime: 17 sec
A snippet of the data in the table cooccurrence:
product_id | other "product_id" | count |
1 | 30 | 3 |
1 | 42 | 3 |
1 | 68 | 3 |
1 | 74 | 2 |
1 | 90 | 2 |
CREATE TABLE product_recommendation (customer_id int, rec_product array<int>);
From the above tables recently_purchase_products and cooccurrence we now select maximum the top five products ranked by count field as recommended products.
WITH topk as ( select hivemall.each_top_k( 5, customer_id, cnt, customer_id, other ) as (rank, cnt, customer_id, rec_item) from ( select t1.customer_id, t2.other, max(t2.cnt) as cnt from recently_purchased_products t1 JOIN cooccurrence t2 ON (t1.product_id = t2.product_id) where t1.product_id != t2.other AND NOT EXISTS ( SELECT a.product_id FROM customer_purchased a WHERE a.customer_id = t1.customer_id AND a.product_id = t2.other AND a.purchase_count <= 1 ) group by t1.customer_id, t2.other CLUSTER BY customer_id ) t1 ) INSERT OVERWRITE TABLE product_recommendation select customer_id, map_values(hivemall.to_ordered_map(rank, rec_item)) as rec_items from topk group by customer_id ;
The elapsed runtime: 44 sec
Now we have the recommended products for every customer, as shown in following snippet:
customer_id | product_recommendation rec_product |
3 | [559,603] |
5 | [710,1326] |
6 | [202,623] |
10 | [1443] |
14 | [1333,1399] |
17 | [1420,1169,420] |
19 | [948,447,1368] |
20 | [1314,1374] |
21 | [535,1295] |
23 | [344,631] |
Alternatively, it is possible to employ a similarity based recommendation, but for that we must go through a few extra steps. First we create a set of features for every product and apply the function hivemall consine_similarity to these product feature-pairs.
The products purchased together from the table co occurrence are concatenated into a column feature_vector.
CREATE TABLE product_features (product_id int, feature_vector ARRAY<string>);
INSERT OVERWRITE TABLE product_features SELECT product_id, collect_list(hivemall.feature(other, cnt) ) as feature_vector FROM cooccurrence GROUP BY product_id;
Optional Tip: Adjust scaling `ln(cnt+1)` to avoid large value in the feature vector
Elapsed runtime: 15 sec
product_id | feature_vector : product_id:count, ... |
1 | ["42:2","68:2","74:2","130:2","181:2","185:2" ... more |
2 | ["6:2","12:2","15:2","36:2","58:2","62:2","81:2" ... more |
3 | ["16:2","21:3","43:2","53:2","62:2","70:2","76:3" ... more |
Basically, the next step is to compute the Cosine similarity for two products of their feature vectors. The computation required a significant amount of time (approx. 11 minutes).
CREATE TABLE product_similarity (product_id int, other in, similarity double);
WITH similarity as ( select t1.product_id, t2.product_id as other, hivemall.cosine_similarity(t1.feature_vector, t2.feature_vector) as similarity FROM product_features t1 CROSS JOIN product_features t2 WHERE t1.product_id != t2.product_id ), topk as ( SELECT hivemall.each_top_k(3, product_id, similarity,product_id, other) as (rank, similarity, product_id, other) FROM (select * from similarity where similarity > 0 CLUSTER BY product_id) t) INSERT OVERWRITE TABLE product_similarity SELECT product_id, other, similarity FROM topk;
Elapsed runtime: 674.603 sec
product_id | product_similarity other | similarity |
1 | 201 | 0.19589653611183167 |
1 | 567 | 0.1647367626428604 |
1 | 644 | 0.1627947986125946 |
2 | 505 | 0.30411943793296814 |
2 | 455 | 0.29358285665512085 |
2 | 1360 | 0.2841376066207886 |
3 | 962 | 0.2816478908061981 |
3 | 1428 | 0.2689010500907898 |
3 | 629 | 0.2549935281276703 |
4 | 609 | 0.25717243552207947 |
4 | 620 | 0.24852953851222992 |
4 | 597 | 0.24722814559936523 |
5 | 548 | 0.2888334095478058 |
5 | 115 | 0.26622357964515686 |
5 | 481 | 0.26322928071022034 |
Final step is create the product_recommendation base on similarity that is calculate by hivemall.cosine_similarity.
WITH topk as ( select hivemall.each_top_k( 5, customer_id, similarity, customer_id, other ) as (rank, similarity, customer_id, rec_item) from ( select t1.customer_id, t2.other, max(t2.similarity) as similarity from recently_purchased_products t1 JOIN product_similarity t2 ON (t1.product_id = t2.product_id) where t1.product_id != t2.other -- do not include items that user already purchased AND NOT EXISTS ( SELECT a.product_id FROM customer_purchased a WHERE a.customer_id = t1.customer_id AND a.product_id = t2.other AND a.purchase_count <= 1 -- optional ) group by t1.customer_id, t2.other CLUSTER BY customer_id -- top-k grouping by userid ) t1 ) INSERT OVERWRITE TABLE product_recommendation select customer_id, map_values(hivemall.to_ordered_map(rank, rec_item)) as rec_items from topk group by customer_id ;
Elapsed runtime: 20 sec
customer_id | product_recommendation.rec_product |
3 | [376,1392,526,660,1336] |
5 | [544,883,1443] |
6 | [984,1501,66,358,1368] |
10 | [121,1546,984,1103,603] |
14 | [794,428,404,418,1416] |
17 | [334,111,24,1169,337] |
19 | [1202,447,1295,979,1152] |
20 | [968,43,432,952,1428] |
21 | [895,1295,1467] |
23 | [344,11,1423,1176,408] |
Created on 01-11-2019 07:44 PM
Very nice! I did two articles a few years ago, but things have advanced greatly.
https://community.hortonworks.com/articles/67983/apache-hive-with-apache-hivemall.html