Member since
07-18-2016
94
Posts
94
Kudos Received
20
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1430 | 08-11-2017 06:04 PM | |
1178 | 08-02-2017 11:22 PM | |
3033 | 07-10-2017 03:36 PM | |
12944 | 03-17-2017 01:27 AM | |
11399 | 02-24-2017 05:35 PM |
04-12-2018
03:28 PM
Repo DescriptionRepo Info Github Repo URL https://github.com/zaratsian/Spark/blob/master/pysparkling_water_loan_prediction.py Github account name zaratsian/Spark/blob/master Repo name pysparkling_water_loan_prediction.py
... View more
- Find more articles tagged with:
- ambari-extensions
- h2o
- pyspark
- solutions
- Spark
Labels:
11-08-2017
04:51 PM
5 Kudos
H2O is an open source deep learning technology for data scientists. Sparkling Water allows users to combine the fast, scalable machine learning algorithms of H2O with the capabilities of Spark. In this tutorial, I will walk you through the steps required to setup H2O Sparkling Water (specifically PySparkling Water) along with Zeppelin in order to execute your machine learning scripts. Here are a few points to note before I get started: 1.) There is a known issue when running Sparkling Water within Zeppelin. This issue is documented in this Jira (AttributeError: 'Logger' object has no attribute 'isatty'). To bypass this issue, I use Zeppelin combined with Livy Server to execute the Sparkling Water jobs. If you are not familiar with Apache Livy, it is a service that enables easy interaction with a Spark cluster over a REST interface. 2.) Testing was performed within the following environment: Hortonworks HDP 2.6.2 CentOS Linux release 7.2.1511 (Core) Python 2.7.5 Spark 2.1.1 Zeppelin 0.7.2 Now, let's walk through the steps: Step 1: Download Sparkling Water from here, or login to your Spark client node and run: wget http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.1/16/sparkling-water-2.1.16.zip Step 2: Unzip and move the PySparkling Water dependency to HDFS: # Unzip Sparkling Water
unzip sparkling-water-2.1.16.zip
# Move the .zip dependency to a location within HDFS (make sure that this location is accessible from Zeppelin/Livy)
hadoop fs -put sparkling-water-2.1.16/py/build/dist/h2o_pysparkling_2.1-2.1.16.zip /tmp/. Step 3: Ensure that required python libraries are installed on each datanode: pip install tabulate
pip install future
pip install requests
pip install six
pip install numpy
pip install colorama
pip install --upgrade requests Step 4: Edit the HDFS configs by adding the two parameters to custom core-site: hadoop.proxyuser.livy.groups=*
hadoop.proxyuser.livy.hosts=* Step 5: Add an new directory to HDFS for "admin" or the user(s) issuing Sparkling Water code: hadoop fs -mkdir /user/admin
hadoop fs -chown admin:hdfs /user/admin
Step 6: Within Zeppelin, edit the Livy Interpreter and add a new parameter called livy.spark.submit.pyFiles (the value of this parameter should be your HDFS path to the PySparking Water .zip file): Step 7: Within Zeppelin, import libraries, initialize the H2OContext, then run your PySparkling Water Scripts: %livy2.pyspark
from pysparkling import *
hc = H2OContext.getOrCreate(spark)
import h2o
from h2o.estimators.gbm import H2OGradientBoostingEstimator
from h2o.estimators.deeplearning import H2ODeepLearningEstimator
from pyspark.sql.types import *
from pyspark.sql.functions import *
loans = h2o.import_file(path="hdfs://dzaratsian0.field.hortonworks.com:8020/tmp/loan_200k.csv", header=0)
loans.head()
df_loans = hc.as_spark_frame(loans,)
df_loans.show(10)<br> References: Hortonworks HDP 2.6.2 H2O Downloads
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- FAQ
- h2o
- How-ToTutorial
- livy
- Spark
- zeppelin
Labels:
08-23-2017
01:25 PM
Hi @Lukas Müller You need to import the required libraries: from pyspark.sql.functions import struct You could also just import all classes: from pyspark.sql.functions import *
... View more
08-22-2017
01:47 PM
@Lukas Müller
This should work for you: from pyspark.sql.types import *
from pyspark.sql.functions import udf
# Create your UDF object (which accepts your python function called "my_udf")
udf_object = udf(my_udf, ArrayType(StringType()))
# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("new_column", udf_object(struct([df[x] for x in df.columns]))) That should work for you. If you want to make this better, replace "ArrayType(StringType())" with a schema such as: schema = ArrayType(StructType([
StructField("mychar", StringType(), False),
StructField("myint", IntegerType(), False)
])) Hope this helps!
... View more
08-16-2017
05:11 PM
@Félicien Catherin
Did you modify the zeppelin-env.sh at all? That command you are running should work fine, so I'm wondering if something was modified within your configs as part of the install. I am running HDP 2.6 and was able to run a simple test (as you did above). It worked as expected. I've included the commands that I ran as well as the parameter settings within my Zeppelin interpreter. Hopefully this is helpful as a comparison for you.
... View more
08-11-2017
06:28 PM
@Lukas Müller Great, happy you got it working!
... View more
08-11-2017
06:04 PM
1 Kudo
Hi @Lukas Müller, does this work for you? ./bin/spark-submit --jars /path/to/file1.jar,/path/to/file2.jar --packages com.databricks:spark-csv_2.x:x.x.x pyspark_code.py What version of Spark are you using? If you are using Spark 2.0+, you should not need to specify this jar (just as an FYI for you). Please let me know if this helps.
... View more
08-04-2017
01:09 PM
Thanks @Hugo Felix I'll refer to your other posts to tackle the additional issues. I wonder if there's an issue in the way you are storing the twitter data within Hive. Here's an older post, but details the serde and Hive queries necessary to read the Twitter data on an older version of HDP. You may find this helpful: https://hortonworks.com/blog/howto-use-hive-to-sqlize-your-own-tweets-part-two-loading-hive-sql-queries/
... View more
08-03-2017
12:28 AM
@Hugo Felix It could solve the problem because serdes are build-in and updated along with Hive updates. It works fine for me in recent versions of HDP, so that is why I wanted to mention it. I saw that you opened another question specifically for the Hive serde issue. Your original question "How to make hive queries include scala and python functions" is answered as part of my posts, so when you get a chance, could you please accept the best answer. There are a lot of responses in this thread, so that may help someone else out. I do have one other thought to debug your JSON serde error. It could be that the way that you stored JSON within Hive is incorrect. If that is the case, then when you try to execute a python UDF against that Hive record, it isn't able to find the right structure. If you execute a "select *" against your hive table, how does the output look?
... View more
08-02-2017
11:45 PM
@Gordon Banker Open up the Zeppelin UI, then click on "Interpreter" within the dropdown menu in the upper right-hand corner. From there, you can scroll down to the Spark interpreter (or do a search for "python") and you will see a field called "zeppelin.pyspark.python". You can change this value to point to your alternative python location (i.e. change python to something like /path/to/new/bin/python). Let me know if that helps.
... View more
08-02-2017
11:22 PM
@chandramouli muthukumaran You'll want to install sklearn (pip install -U scikit-learn) and spark-sklearn on all datanodes of the cluster, as well as other relevant python packages such as numpy, scipy, etc. I'd also recommend using YARN as the resource manager, so you are on the right path there. Hope this helps!
... View more
08-01-2017
02:42 AM
@Hugo Felix Thanks for the update! Hortonworks is currently on HDP 2.6, so if you have the option, it sounds like it would be beneficial to upgrade. Also as a quick reference, the most recent Hortonworks Sandbox can be downloaded from here: https://hortonworks.com/downloads/#sandbox Here's a link to the documentation as well: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/index.html
... View more
07-24-2017
12:21 PM
@Hugo Felix Nice! Glad to see that you got it working. If you upgrade to Spark 2.x, then you should not need to add the serde (just something to keep in mind). If you're all set, can you please mark thread as accepted. Thanks!
... View more
07-20-2017
04:13 PM
1 Kudo
@Hugo Felix Here's a test, that should help to determine if your syntax is off or if your environment is misconfigured. First, create a test Hive table and populate it with data: CREATE TABLE IF NOT EXISTS testtable
(id string, text string)
STORED AS ORC;
INSERT INTO TABLE testtable VALUES
('1111', 'The service was great, and the agent was very helpful'),
('2222', 'I enjoyed the event but the food was terrible'),
('3333', 'Unhappy with the organization of the event')
Then create a file called "my_py_udf.py" (as shown below). It can be placed anywhere, but in my example I placed it at /tmp/my_py_udf.py. import sys
for line in sys.stdin:
id, text = line.replace('\n',' ').split('\t')
positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])
words = text.split()
word_count = len(words)
positive_matches = [1 for word in words if word in positive]
negative_matches = [-1 for word in words if word in negative]
st = sum(positive_matches) + sum(negative_matches)
if st > 0:
print '\t'.join([text, 'positive', str(word_count)])
elif st < 0:
print '\t'.join([text, 'negative', str(word_count)])
else:
print '\t'.join([text, 'neutral', str(word_count)])
Then from within Hive, execute the following commands: ADD FILE my_py_udf.py;
SELECT
TRANSFORM (id, text)
USING 'python my_py_udf.py'
AS (text, sentiment, word_count)
FROM testtable;
Your resulting output should look like this: The service was great, and the agent was very helpful positive 10
I enjoyed the event but the food was terrible neutral 9
Unhappy with the organization of the event neutral 7
... View more
07-17-2017
08:02 PM
@Hugo Felix Yeah, that was the next test I had in mind for you to test out. Thanks for sharing the results. Can you share the environment you are working in? Are you using Spark 1.6.x or Spark 2.x? Also, what version of HDP are you using?
... View more
07-17-2017
04:21 PM
@Hugo Felix What happens if you replace created_at, user.screen_name, text = line.split('\t')
with created_at, screen_name, text = line.split('\t')
I do not believe python will be able to parse the user.screen_name variable in the context you are writing it. In my example, the function accepts a tab-delimited argument and then you perform the split('\t'), it parses the argument out into X number of variables. The names of the assigned variables (such as created_at, screen_name, text) are arbitrary (you could name then x,y,z if you wanted, but you would have to make sure the rest of the python script used the x,y,z variable names). Give that a try and let me know if it helps. Thanks.
... View more
07-12-2017
08:01 PM
2 Kudos
@Hugo Felix It might be an out of memory issue, but it could also be a variable/column mismatch. Can you share your python function and the Hive query so that I can review?
... View more
07-10-2017
03:36 PM
3 Kudos
@Hugo Felix One option is to implement these functions as a Hive UDF (written in python). For example, your new python function (my_py_udf.py) would look something like this: import sys
for line in sys.stdin:
createdAt, screenName, text = line.replace('\n',' ').split('\t')
positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
negative = set(["hate", "bad", "stupid"])
words = text.split()
word_count = len(words)
positive_matches = [1 for word in words if word in positive]
negative_matches = [-1 for word in words if word in negative]
st = sum(positive_matches) + sum(negative_matches)
if st > 0:
print '\t'.join([text, 'positive', str(word_count)])
elif st < 0:
print '\t'.join([text, 'negative', str(word_count)])
else:
print '\t'.join([text, 'neutral', str(word_count)])
NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function). To call this UDF within Hive, run Hive code similar to this: ADD FILE /home/hive/my_py_udf.py;
SELECT
TRANSFORM (createdAt, screenName, text)
USING 'python my_py_udf.py'
AS text,
sentiment,
word_count
FROM tweets;
Hope this helps!
... View more
05-11-2017
01:04 PM
@Amit Panda You can reference these docs for more information regarding Zeppelin security with Active Directory: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_zeppelin-component-guide/content/config-secure-prod-ad.html. Have you seen these docs? You do not want to hardcode the username/password in the shiro config file. Rather, you will want to add the authentication settings and optionally use a self-signed certificate. Let me know if you'd like more info - hope this helps!
... View more
03-28-2017
01:39 AM
7 Kudos
In this article, I'll show how to analyze a real-time data stream using Spark Structured Streaming. I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka.
From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.
Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology: Security: Real-time log analysis allows organizations to extract IP addresses, ports, services and events. This data can then be aggregated based on a time window, analyzed, and monitored for abnormal activity. Spark Structured Streaming also supports real-time joins with static data, further enriching the logs by incorporating external data such as location, detailed user information, and historical data. Sensors & IoT: When working with sensors, out-of-order data is a challenge. The order of readings is critical for identifying patterns and behavior within an environment. One of the goals of Spark Structured Streaming is to maintain order using "watermarking", which enables the engine to automatically track the current event time within the data and attempt to clean up the old state accordingly. Web Analytics: Structured streaming can be used to route, process, and aggregate clickstream data from your website. This analysis can feed external systems in order to proactively send notifications to users, launch a web form, or trigger an action in a 3rd party system. Call Center: Identify trends related to call volume, response times, emerging topics, at-risk customers and cross-sell opportunities. Spark is capable of processing both the structure and unstructured call records to address these business needs.
Social Media: Analyze social feeds in real-time to detect influencers, trending topics, abnormal volume, or other indicators. All of this can be monitored and aggregated within a defined time window. The example below will go into more details related to this specific use case.
Here are the technical details associated with this application: Step 1: Connect to Twitter and stream the data to Kafka A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow: Step 2: Use Spark to read Kafka Stream Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Spark Structured Streaming subscribes to our Kafka topic using the code shown below: # Consume Kafka topic
events = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dztopic1") .load()
# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")
Step 3: Define Python UDFs I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field. Function: parse_json def parse_json(df):
twitterid = str(json.loads(df[0])['id'])
created_at = str(json.loads(df[0])['created_at'])
tweet = str(json.loads(df[0])['text'])
screen_name = str(json.loads(df[0])['user']['screen_name'])
return [twitterid, created_at, tweet, screen_name]
Function: convert_twitter_date def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
Step 4: Parse JSON within Spark Once we have the JSON string, I used the two python UDFs to parse each payload, convert the timestamp, and output our relevant dataframe columns (created_at, screen_name, tweet, and create_at_ts). json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) .where(col("parsed_field").isNotNull()) .withColumn("created_at", col("parsed_field.created_at")) .withColumn("screen_name", col("parsed_field.screen_name")) .withColumn("tweet", col("parsed_field.tweet")) .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
Step 5: Spark Windowing Using Spark to capture trending screen_names. I chose to use the window operation to aggregate within 1 minute windows and with a slide duration of 15 seconds. # http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window
# pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
windowedCounts = jsonoutput.groupBy(
window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),
jsonoutput.screen_name
).count()
Step 6: Start Spark Structure Streaming Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables. query_window = windowedCounts .writeStream .outputMode("complete") .format("memory") .queryName("myTable_window") .start()
query_json = jsonoutput .writeStream .outputMode("append") .format("memory") .queryName("myTable_json") .start()
Step 7: Interactively Query Structured Streaming Table Each in-memory table can be interactively queried to give the current state using standard SparkSQL syntax as shown below. Query myTable_json: Output 15 records of the in-memory table spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
Query myTable_json: Output the top 15 Twitter authors spark.sql("select screen_name, count(*) as count from myTable_json group by screen_name order by count desc limit 15").show(15,False)
Query myTable_window: Output the window aggregations spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
References: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
https://hortonworks.com/apache/spark/ https://hortonworks.com/apache/spark/ Environment: Apache Spark 2.1.0 (Pyspark with Python 2.7.5), Apache NiFi 1.1.0, Apache Kafka 2.10-0.8.2.1
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- How-ToTutorial
- Kafka
- NiFi
- pyspark
- Spark
- spark-streaming
Labels:
03-17-2017
01:27 AM
2 Kudos
Hi @Sean Byrne I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced). However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code: # Use PySpark to read in all "part" files
allfiles = spark.read.option("header","false").csv("/destination_path/part-*.csv")
# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")
Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format). If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.
... View more
03-08-2017
05:46 PM
2 Kudos
@X Long I do not believe it was removed in Spark 2.1.0. Here's the documentation for Broadcast Variables (for scala, java, and python): http://spark.apache.org/docs/2.1.0/programming-guide.html#broadcast-variables You may also need to get the spark.sql.autoBroadcastJoinThreshold parameter, if you are running into errors. This parameter sets the max size (in bytes) for a table that will be broadcast to all worker nodes when performing a join. If you are running in to an error, can you please post that as well. Thanks!
... View more
03-08-2017
03:24 PM
1 Kudo
@Jayadeep Jayaraman Yes, Pandas will execute on the nodes within your cluster, but you need to make sure that pandas (and any other libraries) are installed on the nodes. To accomplish this, Hortonworks and Anaconda have partnered to create Cluster Management Packs: https://www.continuum.io/blog/developer-blog/self-service-open-data-science-custom-anaconda-management-packs-hortonworks-hdp This is the preferred way to manage and ship python (and R) packages within your HDP cluster. If you want to create a python virtual environment and ship it to the nodes of your cluster, then here's a good article: https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html You need to zip up the relevant python environment (or packages) and ship it when starting your spark session, such as: ./bin/spark-submit --master yarn-cluster --archives your_python_env_or_packages.zip pyspark_project.py
... View more
03-06-2017
01:19 PM
@Dinesh Das Where is the table name coming from? In your case, is the user able to specify this (either manually or through code) when the job is submitted? If so, you can have the user specify the table name (and any other parameters) as a command line argument, such as: ./bin/spark-submit --class classname project.jar your_table_name Reference this within your scala code to read in your argument(s): val my_table = args(0)
... View more
02-27-2017
01:11 PM
2 Kudos
@Aditya Mamidala Here's a working example of foreachPartition that I've used as part of a project. This is part of a Spark Streaming process, where "event" is a DStream, and each stream is written to HBase via Phoenix (JDBC). I have a structure similar to what you tried in your code, where I first use foreachRDD then foreachPartition. event.map(x => x._2 ).foreachRDD { rdd =>
rdd.foreachPartition { rddpartition =>
val thinUrl = "jdbc:phoenix:phoenix.dev:2181:/hbase"
val conn = DriverManager.getConnection(thinUrl)
rddpartition.foreach { record =>
conn.createStatement().execute("UPSERT INTO myTable VALUES (" + record._1 + ")" )
}
conn.commit()
}
}
The full project is located here.
... View more
02-24-2017
05:35 PM
1 Kudo
I like @Bernhard Walter's PySpark solution! Here's another way to do it using Scala: import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/test_1.csv","/tmp/test_2.csv","/tmp/test_3.csv")
df.show()
... View more
02-24-2017
05:27 PM
1 Kudo
@Maher Hattabi You should be able to directly read in multiple files as part of the sqlContext.read statement, as shown below: import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/test_1.csv","/tmp/test_2.csv","/tmp/test_3.csv")
df.show()
If you are using Spark 2.0 or newer, this is the preferred syntax (using the spark context): val df = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/test_1.csv","/tmp/test_2.csv","/tmp/test_3.csv")
df.show()
Please let me know if this helps.
... View more
02-21-2017
08:53 PM
2 Kudos
@Dinesh Chitlangia Based on your background, I'd recommend that you go with Scala. Spark is written in Scala and it will give you full access to all the latest APIs, features, etc. You should be able to pick up Scala quickly and can also incorporate Java into your Scala code (if you'd like). In case you didn't know, you can also use the Spark Java API: http://spark.apache.org/docs/latest/api/java/index.html I personally like python and would like to convert you, 🙂 but in this situation I really don't see any advantage for you to go that route given your history with Java. From a performance perspective, this is a great presentation to view: http://www.slideshare.net/databricks/2015-0616-spark-summit. Slide 16 shows performance comparisons - Takeaway is that you should use Dataframes (or preferably the new Datasets), instead of RDDs regardless of what language you go with.
... View more
02-21-2017
05:48 PM
7 Kudos
What kind of text or unstructured data are you collecting within your company? Are you able to fully utilize this data to enhance predictive models, build reports/visualizations, and detect emerging trends found within the text?
Hadoop enables distributed, low-cost storage for this growing amount of unstructured data. In this post, I'll show one way to analyze unstructured data using Apache Spark. Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing.
When it comes to text analytics, you have a few option for analyzing text. I like to categorize these techniques like this:
Text Mining (i.e. Text clustering, data-driven topics)
Categorization (i.e. Tagging unstructured data into categories and sub-categories; hierarchies; taxonomies)
Entity Extraction (i.e. Extracting patterns such as phrases, addresses, product codes, phone numbers, etc.)
Sentiment Analysis (i.e. Tagging positive, negative, or neutral with varying levels of sentiment)
Deep Linguistics (i.e Semantics. Understanding causality, purpose, time, etc.)
Which technique you use typically depends on the business use case and the question(s) you are trying to answer. It's also common to combine these techniques.
This post will focus on text mining in order to uncover data-driven text topics.
For this example, I chose to analyze customer reviews of their airline experience. The goal of this analysis is to use statistics to find data-driven topics across a collection of customer airlines reviews. Here's the process I took:
1. Load the Airline Data from HDFS
rawdata = spark.read.load("hdfs://sandbox.hortonworks.com:8020/tmp/airlines.csv", format="csv", header=True)
# Show rawdata (as DataFrame)
rawdata.show(10)
2. Pre-processing and Text Cleanup
I believe this is the most important step in any text analytics process. Here I am converting each customer review into a list of words, while removing all stopwords (which is a list of commonly used words that we want removed from our analysis). I have also removed any special characters, punctuation, and lowcased the words so that everything is uniform when I create my term frequency matrix. As part of the text pre-processing phase, you may also choose to incorporate stemming (which groups all forms a word, such as grouping "take", "takes", "took", "taking"). Depending on your use case, you could also include part-of-speech tagging, which will identify nouns, verbs, adjectives, and more. These POS tags can be used for filtering and to identify advanced linguistic relationships.
def cleanup_text(record):
text = record[8]
uid = record[9]
words = text.split()
# Default list of Stopwords
stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at',
u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by',
u'can', 'cant', 'come', u'could', 'couldnt',
u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during',
u'each',
u'few', 'finally', u'for', u'from', u'further',
u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how',
u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself',
u'just',
u'll',
u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself',
u'no', u'nor', u'not', u'now',
u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own',
u'r', u're',
u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such',
u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too',
u'under', u'until', u'up',
u'very',
u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would',
u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
# Custom List of Stopwords - Add your own here
stopwords_custom = ['']
stopwords = stopwords_core + stopwords_custom
stopwords = [word.lower() for word in stopwords]
text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words] # Remove special characters
text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords] # Remove stopwords and words under X length
return text_out
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))
This will create an array of strings (words), which can be used within our Term-Frequency calculation.
3. Generate a TF-IDF (Term Frequency Inverse Document Frequency) Matrix
In this step, I calculate the TF-IDF. Our goal here is to put a weight on each word in order to assess its significance. This algorithm down-weights words that appear very frequently. For example, if the word "airline" appeared in every customer review, then it has little power in differentiating one review from another. Whereas the words "mechanical" and "failure" (as an example) may only be seen in a small subset of customer reviews, and therefore be more important in identifying a topic of interest.
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)
# Term Frequency Vectorization - Option 2 (CountVectorizer) :
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF
The TF-IDF algorithm produces a "feature" variable, which contains a vector set of term weights corresponding to each word within the associated customer review.
4. Use LDA to Cluster the TF-IDF Matrix
LDA (Latent Dirichlet Allocation) is a topic model that infers topics from a collection of unstructured data. The output of this algorithm is k number of topics, which correspond to the most significant and distinct topics across your set of customer reviews.
# Generate 25 Data-Driven Topics:
# "em" = expectation-maximization
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
model.isDistributed()
model.vocabSize()
ldatopics = ldamodel.describeTopics()
# Show the top 25 Topics
ldatopics.show(25)
Based on my airline data, this code will produce the following output:
Keep in mind that these topics are data-driven (using statistical techniques) and does not require external datasets, dictionaries, or training datasets to generate these results. Because of the statistical nature of this process, some business understanding and inference needs to be applied to these topics in order to create a clear description for business users. I've relabeled a few of these as an example:
Topic 0: Seating Concerns (more legroom, exit aisle, extra room)
Topic 1: Vegas Trips (including upgrades)
Topic 7: Carryon Luggage, Overhead Space Concerns
Topic 12: Denver Delays, Mechanical Issues
So how can you improve these topics?
Spend time on the text pre-processing phase
Remove stopwords
Incorporate stemming
Consider using part-of-speech tags
Play with the number of k, topics within the LDA algorithm. A small number of topics may not get you the granularity you need. Alternatively, too many topics could introduce redundancy or very similar topics.
5. What can you do with these results?
Look for trends in the Data-Driven Topics: You may start to notice an increase or decrease in a specific topic. For example, if you notice a growing trend for one of your topics (ie. mechanical/maintenance related issues out of JFK Airport), then you may need to do further investigate as to why. Text analytics can also help to undercover root causes for these mechanical issues (if you can get ahold of technician notes or maintenance logs). Or what if you notice a data-driven topic that discusses rude flight attendants or airline cleanliness; this may indicate an issue with a flight crew. Below, I have included a screenshot showing the trends for Topic 12: (Denver, Mechanical/Maintenance issues). You can see spikes for the various airlines, which indicate when customers were complaining about mechanical or maintenance related issues:
Reporting & Visualization
As another quick example, below, I am showing how you can use Apache Zeppelin, to create a time series for each topic, along with a dropdown box so that an airline (or another attribute of interest) can be selected.
Enhance Predictive Models with Text
Lastly, and likely the most important, these text topics can be used to enhance you predictive models. Since each topic can be it's own variable, you are now able to use these within your predictive model. As an example, if you add these new variables to your model, you may find that departure location, airline, and a topic related to mechanical issues are most predictive in whether or not a customer gives you a high or low rating.
Thanks for reading and please let me know if you have any feedback! I'm also working on additional examples to illustrate categorization, sentiment analytics, and how to use this data to enhance predictive models.
References:
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF
https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer
https://github.com/zaratsian/PySpark/blob/master/text_analytics_datadriven_topics.json
... View more
- Find more articles tagged with:
- analytics
- Data Science & Advanced Analytics
- How-ToTutorial
- pyspark
- Spark
- text-processing
Labels:
02-14-2017
05:30 PM
@Hary Tadi You should make a web request to a URL similar to this: https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=35.7796,-78.6382&radius=500&type=restaurant&keyword=cruise&key=your_API_key_goes_here Note: Make sure you insert your own Google Places API key. This is using the Google Places API (I'm targeting restaurants in the Raleigh, NC area). Here's what the json response will look like: {
"html_attributions" : [],
"results" : [
{
"geometry" : {
"location" : {
"lat" : 35.7781669,
"lng" : -78.640547
},
"viewport" : {
"northeast" : {
"lat" : 35.7783907,
"lng" : -78.6405374
},
"southwest" : {
"lat" : 35.7780923,
"lng" : -78.64055020000001
}
}
},
"icon" : "https://maps.gstatic.com/mapfiles/place_api/icons/restaurant-71.png",
"id" : "0b6af460338b6684ebdfb2e9ab166228096a18e1",
"name" : "Death and Taxes",
"opening_hours" : {
"exceptional_date" : [],
"open_now" : false,
"weekday_text" : []
},
"photos" : [
{
"height" : 1519,
"html_attributions" : [
"\u003ca href=\"https://maps.google.com/maps/contrib/112671686149584704404/photos\"\u003eDeath and Taxes\u003c/a\u003e"
],
"photo_reference" : "CoQBdwAAAMCFWj4H8N2nWccPO9wp43h0ZY1MZPtMZbJH_LECDD_tZ2IzLHuFfux3wnd_cFjvgoe_OsUZFz60jcCXLUisaJ5NppdbNMMKGrKJA3fB3PnwaXWzYMc0Vu8w8vL756x3SPSpeHuwon0El6U4Qo913cj_Y7z2s6ZxjH-Ioa20Ojq4EhCw2ZnBG15V6qdIDS7Zb1P5GhS06Ty9AYU7rADFtglQ1EIqpsyZBg",
"width" : 1519
}
],
"place_id" : "ChIJEyImA25frIkRsIhJGErQbQc",
"rating" : 4.3,
"reference" : "CmRRAAAAzfKCDeADWguDAhYAQ9AvF4w2h8emOk71ejJBwaCDleUApKet0RYvvp_izUYNedZGPAQiJTTjtPNJp5l6WP_kfl3nHd0e9XfNnQ-XvdUlw4G_khQ4cjaQCs2GnlpOwj6AEhCJNKr1l3sfdq53PGQ5CTH5GhTDxjJLF1NQ7Xk8B2jVO-uZZn-U3A",
"scope" : "GOOGLE",
"types" : [ "restaurant", "food", "point_of_interest", "establishment" ],
"vicinity" : "105 West Hargett Street, Raleigh"
},
{
"geometry" : {
"location" : {
"lat" : 35.7780598,
"lng" : -78.63677629999999
},
"viewport" : {
"northeast" : {
"lat" : 35.7782437,
"lng" : -78.63676565
},
"southwest" : {
"lat" : 35.7779985,
"lng" : -78.63677985
}
}
},
"icon" : "https://maps.gstatic.com/mapfiles/place_api/icons/restaurant-71.png",
"id" : "50f2ded33ca34d9e0960e8a38c020ada2274b981",
"name" : "Caffe Luna",
"opening_hours" : {
"exceptional_date" : [],
"open_now" : true,
"weekday_text" : []
},
"photos" : [
{
"height" : 2988,
"html_attributions" : [
"\u003ca href=\"https://maps.google.com/maps/contrib/117733338662226365021/photos\"\u003eHannah Cho\u003c/a\u003e"
],
"photo_reference" : "CoQBdwAAAFqcPUfCtPeFyDjoujMX2rbs2BDKh50PMl5urofk1Bj5xRWGvJa9farsbLchr4kFGhuVf-m0CtJkGiJDuavBPPP_KRcms_ly532O2w-8ifwTbi3nZZSt9XPtMyfDittQe10MjoioDEbSNLULzBhPf7ENv5KjWyUTzJwA9teH_3U6EhDFlbSCjW4QGPWd0yLs0cl0GhQfKi5lUuiG_ALM50BR1lAdSUsAqA",
"width" : 5312
}
],
"place_id" : "ChIJi42jeW1frIkR_kp3cwgWp0g",
"price_level" : 2,
"rating" : 4.3,
"reference" : "CmRRAAAAbta6x-fEFyPpkq3E9l-UrlDuVmgSrafwL99qrt5Vz6fPhDI9HIRyfW1QHUoy9TQk_1_raVJhpE0SGU118UolB_0emPuTB-mx-DY4S7yaY7July-0og5BmbpLk0XK_YnAEhDCa6YrbvyYmpnzDG2PYpAKGhQTJ5ls-c7IJu6niyz7x9EMW5JsAA",
"scope" : "GOOGLE",
"types" : [ "restaurant", "food", "point_of_interest", "establishment" ],
"vicinity" : "136 East Hargett Street, Raleigh"
},
{
"geometry" : {
"location" : {
"lat" : 35.7783858,
"lng" : -78.6380794
},
"viewport" : {
"northeast" : {
"lat" : 35.77839010000001,
"lng" : -78.63800795
},
"southwest" : {
"lat" : 35.77837290000001,
"lng" : -78.63829375
}
}
},
"icon" : "https://maps.gstatic.com/mapfiles/place_api/icons/restaurant-71.png",
"id" : "42e66c9ac59d357fefb739dcf600ad6da6cfa674",
"name" : "Sitti",
"opening_hours" : {
"exceptional_date" : [],
"open_now" : true,
"weekday_text" : []
},
"photos" : [
{
"height" : 4008,
"html_attributions" : [
"\u003ca href=\"https://maps.google.com/maps/contrib/113871826378884549564/photos\"\u003eRCL Rekman\u003c/a\u003e"
],
"photo_reference" : "CoQBdwAAADneighCJvgiyqO42i_SiktzOehS5ONG-7vKuzMqcetSQDAYTOr1P0ky6A1itMs2NyGE7FSrx4dGv4nSqW0q5LCwMjlhLRassFZ8iZDAmzDiW3QDFNrcaQLZK9UzkgzxQbTD3WTpCYWNNPwmirSUQhJYXM_3wUHW-AgKH2UerhizEhAhdFeCNsYrSdiErAZ3llQFGhSXS6H0smm0FOLhSGpC_QKOYZ2keg",
"width" : 5344
}
],
"place_id" : "ChIJ19sFjm1frIkRfg9-b_kFExU",
"price_level" : 2,
"rating" : 4.4,
"reference" : "CmRRAAAALD24ysCM_17vFWcKgBEGHUf3vdC0wsnSgpDlNEiuvMg-ElyEXTuGdi_cwdxwLGWrsc2aMUgeSsWpeuuAC3ID-JB7QxpEiLilyeVHnXXLQsAXDCmoznzr_mQQsHI_bCmBEhDW-4Q7XecJsjfHcFg7dFU-GhQmGfy2blho8-skejER2E5nD8IzPQ",
"scope" : "GOOGLE",
"types" : [ "bar", "restaurant", "food", "point_of_interest", "establishment" ],
"vicinity" : "137 South Wilmington Street, Raleigh"
}, Once you have the results, you'll need to use a tool to parse the JSON. I suggest that you use Hortonworks DataFlow (specifically Apache NiFi) to make the web request as well as for parsing the JSON. Then, you can write the data to HDFS, Hive, or where ever you'd like. More info on the Google API, found here: https://developers.google.com/places/web-service/search Here's a few nifi examples to get you started: https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates. I'd suggest looking at Pull_from_Twitter_Garden_Hose.xml and InvokeHttp_And_Route_Original_On_Status.xml Hope this helps!
... View more