## INITIALIZING execution date and model version date_calc = "2018-01-12" date_model = "2017-09-20" ## READ TABLE WITH FEATURES FROM HIVE preds = spark.sql("select * from sandbox.CHURN_PRP_D_PREDS_S") sqlContext.registerDataFrameAsTable(preds, "preds") ## PREPARE DATAFRAME FOR MODELING cols = preds.columns[:] label = "status" features = preds.columns[2:] train = preds.select(cols) for col_name in cols: train = train.withColumn(col_name, train[col_name].cast("double")) from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import StandardScaler assembler = VectorAssembler( inputCols=features, outputCol="features") train = assembler.transform(train) standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled") model = standardScaler.fit(train) train = model.transform(train) #LOAD_MODEL from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel dir = "/user/evgeniy.pahnuk/CHURN_PRP_D" modelname = "CHURN_PRP_D_R_LogReg_" + date_model path = dir + "/" + modelname model_score = LogisticRegressionModel.load(path) ## EVALUATE and extract score and raw_prediction train = model_score.transform(train) from pyspark.sql.functions import udf from pyspark.sql.types import FloatType def extract_fields(df): firstelement = udf(lambda v:float(v[1]),FloatType()) df = df.withColumn('score', firstelement(df['probability'])) df = df.withColumn('raw_prediction', firstelement(df['rawPrediction'])) return df train = extract_fields(train) ## ADDING execution date/time, dropping columns from pyspark.sql.functions import current_timestamp from pyspark.sql.functions import broadcast def drop_fields(df, column_list): for i in column_list: df = df.drop(i) return df prediction_cols_list = ['features', 'features_scaled', 'rawPrediction', 'probability', 'prediction'] train = drop_fields(train, prediction_cols_list) def add_exec_fields(df): date_df = spark.sql("select to_date('{}') as score_date".format(date_calc)) df = df.withColumn("upd_datetime", current_timestamp()) df = df.crossJoin(broadcast(date_df)) return df train = add_exec_fields(train) sqlContext.registerDataFrameAsTable(train, "train") ## INSERT RESULTS INTO HIVE spark.sql("create table sandbox.CHURN_PRP_D_SCORE_S_TEST stored as orc as select * from train")