Member since
02-29-2016
108
Posts
213
Kudos Received
14
Solutions
04-21-2017
12:18 PM
17 Kudos
With the release of HDP 2.6 comes the Hive ACID Merge feature. This is long sought after feature for people who want to batch update table content in ACID manner. This tutorial will walk you through step-by-step how to use this new feature with some really simple dataset. The example is done on HDP 2.6.0.3 installation. First, ACID in Hive need to be turned on in Ambari UI Then we will create 2 tables, one as the target of merge and one as the source of merge. Please note that the target table must be bucketed, set as transaction enabled and stored in orc format. CREATE DATABASE merge_data;
CREATE TABLE merge_data.transactions(
ID int,
TranValue string,
last_update_user string)
PARTITIONED BY (tran_date string)
CLUSTERED BY (ID) into 5 buckets
STORED AS ORC TBLPROPERTIES ('transactional'='true');
CREATE TABLE merge_data.merge_source(
ID int,
TranValue string,
tran_date string)
STORED AS ORC;
Then we will populate the target and source table with some data. INSERT INTO merge_data.transactions PARTITION (tran_date) VALUES
(1, 'value_01', 'creation', '20170410'),
(2, 'value_02', 'creation', '20170410'),
(3, 'value_03', 'creation', '20170410'),
(4, 'value_04', 'creation', '20170410'),
(5, 'value_05', 'creation', '20170413'),
(6, 'value_06', 'creation', '20170413'),
(7, 'value_07', 'creation', '20170413'),
(8, 'value_08', 'creation', '20170413'),
(9, 'value_09', 'creation', '20170413'),
(10, 'value_10','creation', '20170413');
INSERT INTO merge_data.merge_source VALUES
(1, 'value_01', '20170410'),
(4, NULL, '20170410'),
(7, 'value_77777', '20170413'),
(8, NULL, '20170413'),
(8, 'value_08', '20170415'),
(11, 'value_11', '20170415');
When we examine the 2 tables, we would expect after the merge, row 1 should be left untouched, row 4 should be deleted (we imply a business rule here: NULL value indicates deletion), row 7 will be update and row 11 will be insertion for new value. The more interesting use case is with row 8, where it involves move the row from one partition to another. Merge currently does not support change of partition value on the fly. This need to happen as delete in the old partition and insert in the new partition. In real world use case, you need to construct the source table base on this critiria. Then we will create the merge statement as the following. Please note that not all 3 WHEN of the merge statements need to exist, it is fine to have only 2 or even 1 of the WHEN statement. We labeled the data with different last_update_user. For more details on Hive Merge, please refer to Hive document MERGE INTO merge_data.transactions AS T
USING merge_data.merge_source AS S
ON T.ID = S.ID and T.tran_date = S.tran_date
WHEN MATCHED AND (T.TranValue != S.TranValue AND S.TranValue IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, last_update_user = 'merge_update'
WHEN MATCHED AND S.TranValue IS NULL THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.TranValue, 'merge_insert', S.tran_date);
As part of the update clause, the set value statement should not contain the target table decorator "T.", otherwise you will get SQL compile error. Once the merge finished, re-examining the data shows the data is merged just as expected row 1 wasn't changed ; row 4 was deleted; row 7 was updated and row 11 was inserted. And row 8, as we see, got moved to a new partition. SELECT * FROM merge_data.transactions order by ID;
+----+-----------------------+------------------------------+-----------------------+
| id | transactions.tranvalue| transactions.last_update_user| transactions.tran_date|
+----+-----------------------+------------------------------+-----------------------+
| 1 | value_01 | creation | 20170410 |
| 2 | value_02 | creation | 20170410 |
| 3 | value_03 | creation | 20170410 |
| 5 | value_05 | creation | 20170413 |
| 6 | value_06 | creation | 20170413 |
| 7 | value_77777 | merge_update | 20170413 |
| 8 | value_08 | merge_insert | 20170415 |
| 9 | value_09 | creation | 20170413 |
| 10 | value_10 | creation | 20170413 |
| 11 | value_11 | merge_insert | 20170415 |
+----+-----------------------+------------------------------+-----------------------+
This simple example provides instructions on how to use ACID merge with HDP 2.6 or later. Of course the real world use case would be much more complicated comparing to this oversimplified example, but they all follow the same principles. And the beauty behind this is, you don't need to create a ETL process to accomplish this any more.
... View more
10-03-2016
05:14 PM
8 Kudos
Most user access control is accomplished on the group level. One of the example mentioned in Ranger release is to control each doctor only able to view the patients of his/her own. However most example provided up-to-date are mostly static filter condition that will not change for individual user. In this tutorial, we would demonstrate how to create a row level filter that applies to a group but takes different effect on the user level by leveraging UDF. The use case is as following: a patient table contains patient information as well as the id of the doctor treating the patient (this is a simplified example, in really world the relation may be a many-to-many relation, but it could a handled by the same approach). We also have a doctor reference table where the doctor login credential could be found. We will use the login credential to identify the doctor and only show the patient that associated with the doctor. To setup the environment, first download the HDP2.5 Sandbox form Hortonworks website, then follow the instruction to setup the sandbox. Then login to Ambari through 127.0.0.1:8080 using admin/admin and setup the data using the following script in Hive View. create table patients (ssn string, name string, doctor_id int) stored as orc;
create table doctorRef (id int, username string) stored as orc;
insert into doctorRef values (1, 'raj_ops'), (2, 'holger_gov');
insert into patients values
('111-11-1111', 'John Doo', 1),
('222-22-2222', 'Amy Long', 1),
('333-33-3333', 'Muni White', 2),
('444-44-4444', 'Kerry Chang', 2),
('555-55-5555', 'Holy Ng', 1); Then we will setup the dynamic row level filter inside Ranger. Login to Ranger at 127.0.0.1:6080 using admin/admin, then go to Access Manger -> Resource Based Policies, under Hive, click Sandbox_Hive. Then click Row Level Filter Add new policy like the following Policy Type: row level filter Policy Name: dynamic row level filter Hive database: default Hive table: patients Audit logging: yes Description: Select Group: public Select User: Access Type: select Row level filter: doctor_id in (select id from doctorRef where username = current_user())
Then click save.
Now we could test the row level filter policy by login to Ambari 127.0.0.1:8080 as raj_ops/raj_ops, and only the patients with the doctor_id 1 is showing up. When we use UDF current_user() that identify the user in current hive session, we could apply row level filter in group level that could have different effect on each user. This is particularly useful in the environment where large number of users are managed through group policy and users move between groups quite frequently. So no access control need to be done on user level, rather policy could be defined on group level and then through the dynamic row level filter, it will be applied to user level in a dynamic fashion.
... View more
Labels:
08-31-2016
01:50 AM
8 Kudos
As the release of Spark 2.0 finally came, the machine learning library of Spark has been changed from the mllib to ml. One of the biggest change in the new ml library is the introduction of so-called machine learning pipeline. It provides a high level abstraction of the machine learning flow and greatly simplified the creation of machine learning process. In this tutorial, we will walk through the steps on how to create a machine learning pipeline and also explain what is under the hood in the pipeline. (Note: the code in this article is based off the technical preview ml library from Spark 1.6 and with minor change, they should run on Spark 2.0 as well) In this tutorial, we will demonstrate the process to create a pipeline in Spark to predict airline flight delay. The dataset we use contain the airline flights information from 2007 and 2008. We use 2007 data as the training dataset and 2008 data as the testing dataset. We will predict flight delay using 2 classification models, logistic regression and decision tree. The tutorial will walk step-by-step how to use the new ml library transformers to do data munging and then chain them together with machine learning algorithm to create a pipeline to complete the whole process in one step. The tutorial is not meant to be a data science guideline on how to process data and choose machine learning model. It only meant to guide you on how to build Spark ml pipeline in Scala. The choices of data munging decisions as well as the models are not optimal from data science aspect. Environment
We will use the Hortonworks HDP 2.4 sandbox for this tutorial. Please prepare the Sandbox following this instruction and go to the Zeppelin UI at http://127.0.0.1:9995 All the code example are executed inside Zeppelin notebook Get the dataset First, create a new Zeppelin notebook and name it whatever you like Next, use the code below to download the dataset from internet and upload them to HDFS %sh
wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2 -O /tmp/flights_2007.csv.bz2
wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2 -O /tmp/flights_2008.csv.bz2
hdfs dfs -mkdir /tmp/airflightsdelays
hdfs dfs -put /tmp/flights_2007.csv.bz2 /tmp/flights_2008.csv.bz2 /tmp/airflightsdelays/
Load data to DataFrame Next, we will load the data from HDFS into Spark dataframe so it could be processed by ml pipeline. The new ml pipeline only process data inside dataframe, not in RDD like the old mllib. There are 2 dataframe being created, one for training data and one for testing data. A helper function is created to convert the military format time into a integer which is the number of minutes from midnight so we could use it as numeric feature. The detail about the dataset could be found here. %spark
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
//calculate minuted from midnight, input is military time format
def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100)
//define schema for raw data
case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int)
val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2")
val header = flight2007.first
val trainingData = flight2007
.filter(x => x != header)
.map(x => x.split(","))
.filter(x => x(21) == "0")
.filter(x => x(17) == "ORD")
.filter(x => x(14) != "NA")
.map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt))
.toDF
trainingData.cache
val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2")
val testingData = flight2008
.filter(x => x != header)
.map(x => x.split(","))
.filter(x => x(21) == "0")
.filter(x => x(17) == "ORD")
.filter(x => x(14) != "NA")
.map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt))
.toDF
testingData.cache
Create feature transformers The building blocks of ml pipeline are transformers and estimators. Transformers Transform one dataframe to another dataframe. Estimators Fit one dataframe and produce a model, which is a transformer. Most Transforms are under org.apache.spark.ml.feature package and the code below shows 2 of them, StringIndexer and VectorAssembler. StringIndexer converts String values that are part of a look-up into categorical indices, which could be used by machine learning algorithms in ml library. In the dataset we used, we will transfer month, day of month, day of week, carrier and origin airport code using this transformer. Notice we provide the input column name and the output column name as parameters at the time of initialization of the StringIndexer. VectorAssembler constructs Vector from raw feature columns. Most ml machine learning algorithms take features in the form of vector. import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
//tranformor to convert string to category values
val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat")
val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat")
val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat")
val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat")
val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat")
//assemble raw feature
val assembler = new VectorAssembler()
.setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance"))
.setOutputCol("rawFeatures")
Create machine learning pipeline In the last section, we created a few more transformers and defined the parameters for each of them so they operate on the input dataframe and produce desired output dataframe. VectorSlicer takes a vector as input column and create a new vector which contain only part of the attributes of the original vector. Binarizer create a binary value, 0 or 1, from input column of double value with a threshold. StandardScaler is used to scale vector to a new vector which values are in similar scale. We will initialize an estimator, the Logistic Regression Classifier, and chain them together in a machine learning pipeline. When the pipeline is used to fit training data, the transformers and estimator will apply to the dataframe in the sequence defined in the pipeline. The pipeline model produced by fitting the training data contains almost exactly the same process like the pipeline estimator. The testing data will go through the same data munging process except for the last step, where the original estimator in the pipeline estimator is replaced with a model, which is a transformer. import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StandardScaler
//vestor slicer
val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance"))
//scale the features
val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true)
//labels for binary classifier
val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0)
//logistic regression
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features")
// Chain indexers and tree in a Pipeline
val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr))
// Train model.
val lrModel = lrPipeline.fit(trainingData)
// Make predictions.
val lrPredictions = lrModel.transform(testingData)
// Select example rows to display.
lrPredictions.select("prediction", "binaryLabel", "features").show(20) The code below uses another estimator, Decision Tree Classifier. The pipeline is made up with different chain of transformers. But the concept is the same as the previous pipeline. You can see how easy it is to create a different pipeline using existing transformer. PCA is a dimension reduction transformer. VectorIndexer convert values inside vector that could be categorical indices to indices. Bucktizer convert continuous value into categorical indices based on provided threshold. import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.PCA
//index category index in raw feature
val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10)
//PCA
val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10)
//label for multi class classifier
val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features")
// Chain all into a Pipeline
val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt))
// Train model.
val dtModel = dtPipeline.fit(trainingData)
// Make predictions.
val dtPredictions = dtModel.transform(testingData)
// Select example rows to display.
dtPredictions.select("prediction", "multiClassLabel", "features").show(20)
Conclusion Spark machine learning pipeline is a very efficient way of creating machine learning flow. It eliminates the needs to write a lot of boiler-plate code during the data munging process. It also guarantee the training data and testing data go through exactly the same data processing without any additional effort. The same pipeline concept has been implemented in many other popular machine learning library and glad to see it finally available in Spark. How to run this tutorial with Spark 2.0 Since the tutorial was first published, Spark 2.x has gain lots of popularity and many users asked how to run this tutorial in Spark 2.x. With HDP 2.5 Sandbox, Spark 2.0 is included as Tech Preview. Because the version of Zeppelin in HDP 2.5 is not compatible with Spark 2.0 yet, the only to run the sample code is to use spark-shell in console First you need to switch the version of Spark and launch spark-shell export SPARK_MAJOR_VERSION=2
spark-shell
Once inside spark-shell, confirm Spark 2.x is the version used, you should see followings with the HDP 2.5 Sandbox scala> sc.version
res5: String = 2.0.0.2.5.0.0-1245
The only code that need to be modified for the above tutorial code to work in side Spark 2.0 is a import line. spark.milib is the lib for Spark 1.x, just change it to spark.ml, all code would work in Spark2.0 import org.apache.spark.ml.linalg.Vectors For reference, here are the code that works in Spark2.0 spark-shell import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100)
case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int)
val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2")
val header = flight2007.first
val trainingData = flight2007.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
trainingData.cache
val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2")
val testingData = flight2008.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
testingData.cache
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat")
val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat")
val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat")
val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat")
val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat")
val assembler = new VectorAssembler().setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance")).setOutputCol("rawFeatures")
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StandardScaler
val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance"))
val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true)
val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0)
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features")
val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr))
val lrModel = lrPipeline.fit(trainingData)
val lrPredictions = lrModel.transform(testingData)
lrPredictions.select("prediction", "binaryLabel", "features").show(20)
import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.PCA
val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10)
val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10)
val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity))
val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features")
val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt))
val dtModel = dtPipeline.fit(trainingData)
val dtPredictions = dtModel.transform(testingData)
dtPredictions.select("prediction", "multiClassLabel", "features").show(20)
... View more
Labels: