Created on 10-04-2018 11:38 AM - edited 09-16-2022 01:44 AM
Time to continue my series on news author personality detection! This tutorial will look at data science, Zeppelin notebooks with the idea of predicting the popularity of an article based on the personality of an author. This tutorial does assume that you completed part 1 and part 2 of the series. After completing this part, your architecture will look like this:
This tutorial will be divided in sections:
Note: This exercise is a very simplified way to apply machine learning. I spent very little time selecting the appropriate algorithm, accumulating enough data or even challenging whether or not the personality of an author is a viable predictor for personality. This is a full time job, it's called being a data scientist! But hopefully this will enable the real data scientist to please stand up and build useful stuff using the Hortonworks stack.
This section's goal is to first enhance our previous Nifi flow to upsert data into HBase using Phoenix, as well as add a new flow collecting data from the New York Times "most popular" API. You can find all these flows under my github here. I used Nifi registry to push directly to github; if you want to learn how to do that, check out this article: How to configure git for Nifi Registry in HDF 3.2.
Connect to your server and run phoenix-sqlline (with the appropriate user):
$ ssh [YOUR_MACHINE] $ sudo su hbase $ phoenix-sqlline
Create three tables:
Here is the SQL create code:
CREATE TABLE article_evaluation ( web_url VARCHAR NOT NULL PRIMARY KEY, snippet VARCHAR, byline VARCHAR, pub_date TIMESTAMP, headline VARCHAR, document_type VARCHAR, news_desk VARCHAR, filename VARCHAR, last_updated VARCHAR, extraversion DOUBLE, emotional_stability DOUBLE, agreeableness DOUBLE, conscientiousness DOUBLE, openness_to_experience DOUBLE ); CREATE TABLE article_stats ( web_url VARCHAR NOT NULL PRIMARY KEY, views TINYINT, last_updated VARCHAR ); CREATE TABLE article_popularity_prediction ( web_url VARCHAR NOT NULL PRIMARY KEY, extraversion DOUBLE, emotional_stability DOUBLE, agreeableness DOUBLE, conscientiousness DOUBLE, openness_to_experience DOUBLE, prediction DOUBLE );
Our goal is to add the following processors to our previous flow (established in part 1 of this series):
Create a processor and configure it to create/update 3 attributes:
Here is the code for each attribute:
This one is straight forward, as depicted below:
Your replacement value should be:
upsert into article_evaluation values('${web_url}','${snippet}','${byline}','${pub_date}','${headline}','${document_type}','${news_desk}','${filename}','${now()}',${mairessepersonalityrecognition.extraversion},${mairessepersonalityrecognition.emotional_stability},${mairessepersonalityrecognition.agreeableness},${mairessepersonalityrecognition.conscientiousness},${mairessepersonalityrecognition.openness_to_experience})
This processor is straight forward too, but will require to first create a DBCPConnectionPool controller service:
Assuming the nifi node has a phoenix client, here is the parameters you will need to setup:
Database Connection URL
Database Driver Class Name
Database Driver Location(s)
Finally, configure a PutSQL as depicted below. You will note that I turned off the support for fragmented transactions. This is not important here but will in the next flow where we split the output of the API.
The flow itself is pretty self explanatory, as shown below:
Remote URL should be (to get an API key, see part 1):[YOUR_API_KEY]
JsonPath Expression should be:
Extract url from JSON:
Extract views from JSON:
Replacement Value:
upsert into article_stats values('${url}',${views},'${now()}')
This is the exact same processor as in your other flow :)
Go to the maven repository and get the latest jar for phoenix and spark2 (see repository here).
Once downloaded add it to the spark2 libraries, under the following folder: /usr/hdp/current/spark2-client/jars.
In this directory copy the phoenix client if it's not present yet from /usr/hdp/current/phoenix-client/.
$ cd /usr/hdp/current/spark2-client/jars $ wget $ cp /usr/hdp/current/phoenix-client/phoenix- . $ ls -ltra phoenix* -rw-r--r--. 1 root root 222910655 Oct 2 17:35 phoenix- -rwxr-xr-x. 1 root root 83908 Oct 2 17:35 phoenix-spark2-
Restart spark from Ambari.
Open Zeppelin and create a new note (I named mine Linear Regression). Using pyspark, retrieve data from the article_evaluation and article_stats tables into a dataframe, and join them based on the web_url:
%pyspark from pyspark.sql.functions import lit article_evaluation_table = .format("org.apache.phoenix.spark") .option("table", "article_evaluation") .option("zkUrl", "localhost:2181") .load() article_stats_table = .format("org.apache.phoenix.spark") .option("table", "article_stats") .option("zkUrl", "localhost:2181") .load() aet = article_evaluation_table.alias('aet') ast = article_stats_table.alias('ast') inner_join = aet.join(ast, aet.WEB_URL == ast.WEB_URL) article_df ='EXTRAVERSION','OPENNESS_TO_EXPERIENCE','CONSCIENTIOUSNESS','AGREEABLENESS','EMOTIONAL_STABILITY','VIEWS') no_views_df ='WEB_URL','EXTRAVERSION','OPENNESS_TO_EXPERIENCE','CONSCIENTIOUSNESS','AGREEABLENESS','EMOTIONAL_STABILITY').withColumn('VIEWS',lit(0))
In the same note, create a paragraph that creates a vectorized dataframe for your model to use:
%pyspark from import VectorAssembler vectorAssembler = VectorAssembler(inputCols = ['EXTRAVERSION', 'OPENNESS_TO_EXPERIENCE', 'CONSCIENTIOUSNESS', 'AGREEABLENESS', 'EMOTIONAL_STABILITY'], outputCol = 'features') varticle_df = vectorAssembler.transform(article_df) varticle_df =['features', 'VIEWS']) vno_views_df = vectorAssembler.transform(no_views_df) train_df = varticle_df
Use the dataframe created to train a linear regression model:
%pyspark from import LinearRegression lr = LinearRegression(featuresCol = 'features', labelCol='VIEWS', maxIter=10, regParam=0.3, elasticNetParam=0.8) lr_model = print("Coefficients: " + str(lr_model.coefficients)) print("Intercept: " + str(lr_model.intercept))
Check the summary of your model:
%pyspark trainingSummary = lr_model.summary print("RMSE: %f" % trainingSummary.rootMeanSquaredError) print("r2: %f" % trainingSummary.r2) train_df.describe().show()
Run Your model on all articles in the article evaluation table:
%pyspark predictions = lr_model.transform(vno_views_df) df_to_write ="WEB_URL","EXTRAVERSION","OPENNESS_TO_EXPERIENCE","CONSCIENTIOUSNESS","AGREEABLENESS","EMOTIONAL_STABILITY","prediction")
Upload these results to the article_popularity_prediction table using phoenix
%pyspark df_to_write.write .format("org.apache.phoenix.spark") .mode("overwrite") .option("table", "article_popularity_prediction") .option("zkUrl", "localhost:2181") .save()
Finally, if you want to check your predictions versus the actual view, create a jdbc(phoenix) paragraph, as follows:
%jdbc(phoenix) select article_popularity_prediction.web_url as URL, article_popularity_prediction. prediction as predicted_views, article_stats.views from article_popularity_prediction, article_stats where article_popularity_prediction.web_url = article_stats.web_url
As you can see the model is far from perfect. I would also argue that creating a PMML from the model would have been a better way to go for our architecture (calling the PMML from Nifi as data flows), instead of loading results offline like we do here. Again, this is a full time job 🙂 See you next article, where we will create a dashboard to leverage this data science and data engineering we just created!
Note: for reference, I included my Zeppelin note here: linear-regression-zeppelin-note.json