Member since
07-18-2016
94
Posts
94
Kudos Received
20
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2586 | 08-11-2017 06:04 PM | |
2442 | 08-02-2017 11:22 PM | |
9782 | 07-10-2017 03:36 PM | |
17972 | 03-17-2017 01:27 AM | |
14820 | 02-24-2017 05:35 PM |
02-24-2017
05:35 PM
1 Kudo
I like @Bernhard Walter's PySpark solution! Here's another way to do it using Scala: import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/test_1.csv","/tmp/test_2.csv","/tmp/test_3.csv")
df.show()
... View more
02-21-2017
08:53 PM
2 Kudos
@Dinesh Chitlangia Based on your background, I'd recommend that you go with Scala. Spark is written in Scala and it will give you full access to all the latest APIs, features, etc. You should be able to pick up Scala quickly and can also incorporate Java into your Scala code (if you'd like). In case you didn't know, you can also use the Spark Java API: http://spark.apache.org/docs/latest/api/java/index.html I personally like python and would like to convert you, 🙂 but in this situation I really don't see any advantage for you to go that route given your history with Java. From a performance perspective, this is a great presentation to view: http://www.slideshare.net/databricks/2015-0616-spark-summit. Slide 16 shows performance comparisons - Takeaway is that you should use Dataframes (or preferably the new Datasets), instead of RDDs regardless of what language you go with.
... View more
02-21-2017
05:48 PM
7 Kudos
What kind of text or unstructured data are you collecting within your company? Are you able to fully utilize this data to enhance predictive models, build reports/visualizations, and detect emerging trends found within the text?
Hadoop enables distributed, low-cost storage for this growing amount of unstructured data. In this post, I'll show one way to analyze unstructured data using Apache Spark. Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing.
When it comes to text analytics, you have a few option for analyzing text. I like to categorize these techniques like this:
Text Mining (i.e. Text clustering, data-driven topics)
Categorization (i.e. Tagging unstructured data into categories and sub-categories; hierarchies; taxonomies)
Entity Extraction (i.e. Extracting patterns such as phrases, addresses, product codes, phone numbers, etc.)
Sentiment Analysis (i.e. Tagging positive, negative, or neutral with varying levels of sentiment)
Deep Linguistics (i.e Semantics. Understanding causality, purpose, time, etc.)
Which technique you use typically depends on the business use case and the question(s) you are trying to answer. It's also common to combine these techniques.
This post will focus on text mining in order to uncover data-driven text topics.
For this example, I chose to analyze customer reviews of their airline experience. The goal of this analysis is to use statistics to find data-driven topics across a collection of customer airlines reviews. Here's the process I took:
1. Load the Airline Data from HDFS
rawdata = spark.read.load("hdfs://sandbox.hortonworks.com:8020/tmp/airlines.csv", format="csv", header=True)
# Show rawdata (as DataFrame)
rawdata.show(10)
2. Pre-processing and Text Cleanup
I believe this is the most important step in any text analytics process. Here I am converting each customer review into a list of words, while removing all stopwords (which is a list of commonly used words that we want removed from our analysis). I have also removed any special characters, punctuation, and lowcased the words so that everything is uniform when I create my term frequency matrix. As part of the text pre-processing phase, you may also choose to incorporate stemming (which groups all forms a word, such as grouping "take", "takes", "took", "taking"). Depending on your use case, you could also include part-of-speech tagging, which will identify nouns, verbs, adjectives, and more. These POS tags can be used for filtering and to identify advanced linguistic relationships.
def cleanup_text(record):
text = record[8]
uid = record[9]
words = text.split()
# Default list of Stopwords
stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at',
u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by',
u'can', 'cant', 'come', u'could', 'couldnt',
u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during',
u'each',
u'few', 'finally', u'for', u'from', u'further',
u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how',
u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself',
u'just',
u'll',
u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself',
u'no', u'nor', u'not', u'now',
u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own',
u'r', u're',
u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such',
u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too',
u'under', u'until', u'up',
u'very',
u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would',
u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
# Custom List of Stopwords - Add your own here
stopwords_custom = ['']
stopwords = stopwords_core + stopwords_custom
stopwords = [word.lower() for word in stopwords]
text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words] # Remove special characters
text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords] # Remove stopwords and words under X length
return text_out
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))
This will create an array of strings (words), which can be used within our Term-Frequency calculation.
3. Generate a TF-IDF (Term Frequency Inverse Document Frequency) Matrix
In this step, I calculate the TF-IDF. Our goal here is to put a weight on each word in order to assess its significance. This algorithm down-weights words that appear very frequently. For example, if the word "airline" appeared in every customer review, then it has little power in differentiating one review from another. Whereas the words "mechanical" and "failure" (as an example) may only be seen in a small subset of customer reviews, and therefore be more important in identifying a topic of interest.
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)
# Term Frequency Vectorization - Option 2 (CountVectorizer) :
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF
The TF-IDF algorithm produces a "feature" variable, which contains a vector set of term weights corresponding to each word within the associated customer review.
4. Use LDA to Cluster the TF-IDF Matrix
LDA (Latent Dirichlet Allocation) is a topic model that infers topics from a collection of unstructured data. The output of this algorithm is k number of topics, which correspond to the most significant and distinct topics across your set of customer reviews.
# Generate 25 Data-Driven Topics:
# "em" = expectation-maximization
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
model.isDistributed()
model.vocabSize()
ldatopics = ldamodel.describeTopics()
# Show the top 25 Topics
ldatopics.show(25)
Based on my airline data, this code will produce the following output:
Keep in mind that these topics are data-driven (using statistical techniques) and does not require external datasets, dictionaries, or training datasets to generate these results. Because of the statistical nature of this process, some business understanding and inference needs to be applied to these topics in order to create a clear description for business users. I've relabeled a few of these as an example:
Topic 0: Seating Concerns (more legroom, exit aisle, extra room)
Topic 1: Vegas Trips (including upgrades)
Topic 7: Carryon Luggage, Overhead Space Concerns
Topic 12: Denver Delays, Mechanical Issues
So how can you improve these topics?
Spend time on the text pre-processing phase
Remove stopwords
Incorporate stemming
Consider using part-of-speech tags
Play with the number of k, topics within the LDA algorithm. A small number of topics may not get you the granularity you need. Alternatively, too many topics could introduce redundancy or very similar topics.
5. What can you do with these results?
Look for trends in the Data-Driven Topics: You may start to notice an increase or decrease in a specific topic. For example, if you notice a growing trend for one of your topics (ie. mechanical/maintenance related issues out of JFK Airport), then you may need to do further investigate as to why. Text analytics can also help to undercover root causes for these mechanical issues (if you can get ahold of technician notes or maintenance logs). Or what if you notice a data-driven topic that discusses rude flight attendants or airline cleanliness; this may indicate an issue with a flight crew. Below, I have included a screenshot showing the trends for Topic 12: (Denver, Mechanical/Maintenance issues). You can see spikes for the various airlines, which indicate when customers were complaining about mechanical or maintenance related issues:
Reporting & Visualization
As another quick example, below, I am showing how you can use Apache Zeppelin, to create a time series for each topic, along with a dropdown box so that an airline (or another attribute of interest) can be selected.
Enhance Predictive Models with Text
Lastly, and likely the most important, these text topics can be used to enhance you predictive models. Since each topic can be it's own variable, you are now able to use these within your predictive model. As an example, if you add these new variables to your model, you may find that departure location, airline, and a topic related to mechanical issues are most predictive in whether or not a customer gives you a high or low rating.
Thanks for reading and please let me know if you have any feedback! I'm also working on additional examples to illustrate categorization, sentiment analytics, and how to use this data to enhance predictive models.
References:
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF
https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer
https://github.com/zaratsian/PySpark/blob/master/text_analytics_datadriven_topics.json
... View more
Labels:
02-13-2017
01:06 PM
2 Kudos
@Avijeet Dash Here's a good HCC article describing the process to build a custom NiFi processor: https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html If you want to use existing processors/code and modify for your own purpose, then many of the standard processors can be found here: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard If you check out these NiFi processors, you'll be able to view the code for getXXX versus fetchXXX in order to understand the difference (or to modify for your own custom use).
... View more
02-06-2017
07:57 PM
1 Kudo
@Mark Wallace This worked well for me in Spark (Scala), which looks to be the same as @Karthik Narayanan's answer: import org.apache.spark.sql.SparkSession
import spark.implicits._
val data = Seq((1111, 1, "device1", "domain1"), (2222, 0, "device2", "domain2"), (3333, 1, "device3", "domain3"), (4444, 0, "device4", "domain4"))
val df = data.toDF("id","deviceFlag","device","domain")
df.show()
val result = df.withColumn("new_id", when($"deviceFlag"===1, concat($"device", lit("#"), $"domain")).otherwise($"device"))
result.show()
This will output: As an alternative (and reference for others), the PySpark syntax will look like this: from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, concat
data = [(1111, 1, 'device1', 'domain1'), (2222, 0, 'device2', 'domain2'), (3333, 1, 'device3', 'domain3'), (4444, 0, 'device4', 'domain4')]
df = spark.createDataFrame(data, ['id','deviceFlag','device','domain'])
df.show()
result = df.withColumn("new_id", F.when(df["deviceFlag"]==1, concat(df["device"], lit("#"), df["domain"])).otherwise(df["device"]))
result.show()
Hope this helps!
... View more
01-30-2017
04:14 PM
2 Kudos
@Divakar Annapureddy To add to the great suggestions already posted... You might want to check out Rodeo (https://www.yhat.com/products/rodeo). It's open source and can also be found on github: https://github.com/yhat/rodeo
... View more
01-26-2017
11:02 AM
@Fernando Lopez Bello Yes, that does make sense, and you are correct that Zeppelin is primarily used as a code editor. If business users are consuming the results from Zeppelin, you may want to check out the "publish" features within Zeppelin (https://zeppelin.apache.org/docs/0.6.2/manual/publish.html). This would allow you to quickly push results to an HTML page from Zeppelin. If Zeppelin & Hue do not meet your requirements for reporting, you can use enterprise data visualization tools such as Tableau, SAS Visual Analytics, etc. These products can connect to your hadoop cluster via JDBC/ODBC, so you can create visualizations/reports in that way as well. Lastly, you can always create custom HTML/JS reports on top of your HDFS data, but this will take more code. 🙂
... View more
01-25-2017
07:31 PM
1 Kudo
@Fernando Lopez Bello What type of visualizations do you have in mind? Zeppelin has a set of built in graphs that can be used for reporting and basic exploration of the data. Here's an example, https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL2hvcnRvbndvcmtzLWdhbGxlcnkvemVwcGVsaW4tbm90ZWJvb2tzL21hc3Rlci8yQlgzNFFKVFQvbm90ZS5qc29u You can also use angular to create custom JS graphs, such as https://community.hortonworks.com/articles/75834/using-angular-within-apache-zeppelin-to-create-cus.html Either way, these notebooks can be saved, secured, and shared with end users. You may already be aware of some of these features, but I wanted to point out a couple options.
... View more
01-20-2017
02:32 AM
1 Kudo
@subhrajit mohanty Here's an option using RDDs, and I'd like to see if anyone comes up with a good DataFrame solution. Input Format: +---+---+---+---+---+---+---+
| _1| _2| _3| _4| _5| _6| _7|
+---+---+---+---+---+---+---+
|two|0.6|1.2|1.7|1.5|1.4|2.0|
|one|0.3|1.2|1.3|1.5|1.4|1.0|
+---+---+---+---+---+---+---+
Output Format: +---+---+
|two|one|
+---+---+
|0.6|0.3|
|1.2|1.2|
|1.7|1.3|
|1.5|1.5|
|1.4|1.4|
|2.0|1.0|
+---+---+
Code: import numpy as np
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
dt1 = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]}
dt = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in dt1.items()]).toDF()
dt.show()
#--- Start of my Transpose Code ---
# Grad data from first columns, since it will be transposed to new column headers
new_header = [i[0] for i in dt.select("_1").rdd.map(tuple).collect()]
# Remove first column from dataframe
dt2 = dt.select([c for c in dt.columns if c not in ['_1']])
# Convert DataFrame to RDD
rdd = dt2.rdd.map(tuple)
# Transpose Data
rddT1 = rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
rddT2 = rddT1.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
rddT3 = rddT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
rddT4 = rddT3.map(lambda x: map(lambda (i, y): y , x))
# Convert back to DataFrame (along with header)
df = rddT4.toDF(new_header)
df.show()
Let me know if this helps. Reference: http://www.data-intuitive.com/2015/01/transposing-a-spark-rdd/
... View more
01-03-2017
01:38 PM
9 Kudos
Apache Zeppelin is a flexible, multi-purpose notebook used for code editing and data visualization. It supports a variety of languages (using interpreters) and enables users to quickly prototype, write code, share code, and visualize the results using standard charts. But as you move towards data apps and front-end development, you may need to create custom visualizations.
Fortunately, Zeppelin comes with built-in support for Angular through both a front-end API and back-end API. I recently worked on a project to identify networking issues. These issues could range from bad/weak network signals, wiring issues, down nodes, etc. To replicate this, I created an example project where I mapped out the path that a packet takes from my home wifi connection to the google.com servers (using traceroute). The complete project is here, but for this post I want to focus on the steps I took to display the Spark output within Google Maps, all within Zeppelin. Here's a couple screenshots showing the end result of the traceroute analysis and Google Map within Zeppelin: Figure 1: Dashboard showing and input (Point-of-Interest, POI) and the Spark output Figure 2: Google Map created via Angular within Zeppelin Notebook From Figure 1, you can see that a user-defined point-of-interest (POI) can be entered by the user. This is the node, or IP address, that a user is interested in analyzing. From here, the Spark code parses the data, retrieves all parent and child nodes, checkes the health status of each node, then uses that data within Angular to generate the Google Map. Here are the steps that I took to generate this output: Step 1: Create a JS function to bind all required variables to a global object. I started with a helper JS function, thanks to @randerzander, which allowed me to bind my exposed variables to the JS globally accessible object. Within this code, you'll notice that I've includes 7 vars, which correspond to the variables and I will bind within my Spark code. %angular
<!-- Avoid constantly editing JS and list the Angular vars you want exposed in an HTML attribute: -->
<div id="dummy" vars="id,parents,children,poidevice,childrenStr,parentsStr,devicetopology"></div>
<script type="text/javascript">
//Given an element in the note & list of values to fetch from Spark
//window.angularVars.myVal will be current value of backend Spark val of same name
function hoist(element){
var varNames = element.attr('vars').split(',');
window.angularVars = {};
var scope = angular.element(element.parent('.ng-scope')).scope().compiledScope;
$.each(varNames, function(i, v){
window[v+'-watcher'] = scope.$watch(v, function(newVal, oldVal){
window.angularVars[v] = newVal;
});
});
}
hoist($('#dummy'));
</script>
Step 2: Use Spark to process the data, then bind each required variable to angular using z.angularBind. In this example, Spark is used to parse and transform the traceroute data. I chose to use Spark Streaming and stateful in-memory processing, using MapWithState, in order to maintain the current health status of each node within the network topology. The health status and enriched data for each node was then persisted to HBase (via Phoenix), where additional queries and fast, random access read/writes could be performed.
At this point, I have generated all of my Spark variables, but I need to bind them to the angular object in order to create a custom Google Map visualization. Within Spark, here's how you would bind the variables: z.angularBind("poidevice", POIDevice)
z.angularBind("parents", parentDevicesInfo)
z.angularBind("parentsStr", parentDevicesStr)
z.angularBind("children", childrenDevicesInfo)
z.angularBind("childrenStr", childrenDevicesStr)
z.angularBind("devicetopology", devicetopology)
z.angularBind("id", id)
The code, shown above, will bind my Spark variables (such as parentDevicesInfo) to my angular object (parents). Now that all of my Spark variables are available to my angular code, I can work to produce the Google Maps visualization (shown in Figure 2). Step 3: Write Angular JS code to display the Google Map Within Angular, I used the code (below) to read each variable using window.angularVars.<variable_name>. Then, to map each node or point-of-interest, I draw a circle and colored it green (health status = ok) or red (health status = bad). function initMap() {
var id = window.angularVars.id;
var poidevice = window.angularVars.poidevice;
var children = window.angularVars.children;
var childrenStr = window.angularVars.childrenStr;
var parents = window.angularVars.parents;
var parentsStr = window.angularVars.parentsStr;
var devicetopology = window.angularVars.devicetopology;
var POIs = {};
console.log('POI Value: ' + poidevice[0].values);
console.log('Topology Value: ' + devicetopology[0].values.toString().split("|")[0]);
var USA = {lat: 39.8282, lng: -98.5795};
var map = new google.maps.Map(document.getElementById('map'), {zoom: 5, center: USA });
//**********************************************************************************
//
// Draw circle POI Device
//
//**********************************************************************************
$.each(poidevice, function(i, v){
POIs[v.values[0]] = v.values;
//Create marker for each POI
var pos = {lat: parseFloat(v.values[5]), lng: parseFloat(v.values[4]) };
var color = (v.values[11] == '1.0') ? '#FF0000' : '#008000';
var info = '<b>IP</b>: ' + v.values[0] + '<p><b>ISP:</b> ' + v.values[6] + '<p>' + v.values[3] + ", " + v.values[2];
console.log('Drawing POI device: ' + v.values[0] + ' ' + JSON.stringify(pos) + ' ' + v.values[5] + ',' + v.values[4]);
circle(pos, color, info, map);
});
...end of code snippet...
The project is on my Github and shows the full angular code used to create each edge connection based on all parent and child nodes as well as the Spark Streaming code (which I'll detail in a follow-up HCC article).
... View more
Labels: