Member since
07-18-2016
94
Posts
94
Kudos Received
20
Solutions
11-08-2017
04:51 PM
5 Kudos
H2O is an open source deep learning technology for data scientists. Sparkling Water allows users to combine the fast, scalable machine learning algorithms of H2O with the capabilities of Spark. In this tutorial, I will walk you through the steps required to setup H2O Sparkling Water (specifically PySparkling Water) along with Zeppelin in order to execute your machine learning scripts. Here are a few points to note before I get started: 1.) There is a known issue when running Sparkling Water within Zeppelin. This issue is documented in this Jira (AttributeError: 'Logger' object has no attribute 'isatty'). To bypass this issue, I use Zeppelin combined with Livy Server to execute the Sparkling Water jobs. If you are not familiar with Apache Livy, it is a service that enables easy interaction with a Spark cluster over a REST interface. 2.) Testing was performed within the following environment: Hortonworks HDP 2.6.2 CentOS Linux release 7.2.1511 (Core) Python 2.7.5 Spark 2.1.1 Zeppelin 0.7.2 Now, let's walk through the steps: Step 1: Download Sparkling Water from here, or login to your Spark client node and run: wget http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.1/16/sparkling-water-2.1.16.zip Step 2: Unzip and move the PySparkling Water dependency to HDFS: # Unzip Sparkling Water
unzip sparkling-water-2.1.16.zip
# Move the .zip dependency to a location within HDFS (make sure that this location is accessible from Zeppelin/Livy)
hadoop fs -put sparkling-water-2.1.16/py/build/dist/h2o_pysparkling_2.1-2.1.16.zip /tmp/. Step 3: Ensure that required python libraries are installed on each datanode: pip install tabulate
pip install future
pip install requests
pip install six
pip install numpy
pip install colorama
pip install --upgrade requests Step 4: Edit the HDFS configs by adding the two parameters to custom core-site: hadoop.proxyuser.livy.groups=*
hadoop.proxyuser.livy.hosts=* Step 5: Add an new directory to HDFS for "admin" or the user(s) issuing Sparkling Water code: hadoop fs -mkdir /user/admin
hadoop fs -chown admin:hdfs /user/admin
Step 6: Within Zeppelin, edit the Livy Interpreter and add a new parameter called livy.spark.submit.pyFiles (the value of this parameter should be your HDFS path to the PySparking Water .zip file): Step 7: Within Zeppelin, import libraries, initialize the H2OContext, then run your PySparkling Water Scripts: %livy2.pyspark
from pysparkling import *
hc = H2OContext.getOrCreate(spark)
import h2o
from h2o.estimators.gbm import H2OGradientBoostingEstimator
from h2o.estimators.deeplearning import H2ODeepLearningEstimator
from pyspark.sql.types import *
from pyspark.sql.functions import *
loans = h2o.import_file(path="hdfs://dzaratsian0.field.hortonworks.com:8020/tmp/loan_200k.csv", header=0)
loans.head()
df_loans = hc.as_spark_frame(loans,)
df_loans.show(10)<br> References: Hortonworks HDP 2.6.2 H2O Downloads
... View more
Labels:
03-28-2017
01:39 AM
7 Kudos
In this article, I'll show how to analyze a real-time data stream using Spark Structured Streaming. I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka.
From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.
Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology: Security: Real-time log analysis allows organizations to extract IP addresses, ports, services and events. This data can then be aggregated based on a time window, analyzed, and monitored for abnormal activity. Spark Structured Streaming also supports real-time joins with static data, further enriching the logs by incorporating external data such as location, detailed user information, and historical data. Sensors & IoT: When working with sensors, out-of-order data is a challenge. The order of readings is critical for identifying patterns and behavior within an environment. One of the goals of Spark Structured Streaming is to maintain order using "watermarking", which enables the engine to automatically track the current event time within the data and attempt to clean up the old state accordingly. Web Analytics: Structured streaming can be used to route, process, and aggregate clickstream data from your website. This analysis can feed external systems in order to proactively send notifications to users, launch a web form, or trigger an action in a 3rd party system. Call Center: Identify trends related to call volume, response times, emerging topics, at-risk customers and cross-sell opportunities. Spark is capable of processing both the structure and unstructured call records to address these business needs.
Social Media: Analyze social feeds in real-time to detect influencers, trending topics, abnormal volume, or other indicators. All of this can be monitored and aggregated within a defined time window. The example below will go into more details related to this specific use case.
Here are the technical details associated with this application: Step 1: Connect to Twitter and stream the data to Kafka A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow: Step 2: Use Spark to read Kafka Stream Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Spark Structured Streaming subscribes to our Kafka topic using the code shown below: # Consume Kafka topic
events = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dztopic1") .load()
# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")
Step 3: Define Python UDFs I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field. Function: parse_json def parse_json(df):
twitterid = str(json.loads(df[0])['id'])
created_at = str(json.loads(df[0])['created_at'])
tweet = str(json.loads(df[0])['text'])
screen_name = str(json.loads(df[0])['user']['screen_name'])
return [twitterid, created_at, tweet, screen_name]
Function: convert_twitter_date def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
Step 4: Parse JSON within Spark Once we have the JSON string, I used the two python UDFs to parse each payload, convert the timestamp, and output our relevant dataframe columns (created_at, screen_name, tweet, and create_at_ts). json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) .where(col("parsed_field").isNotNull()) .withColumn("created_at", col("parsed_field.created_at")) .withColumn("screen_name", col("parsed_field.screen_name")) .withColumn("tweet", col("parsed_field.tweet")) .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
Step 5: Spark Windowing Using Spark to capture trending screen_names. I chose to use the window operation to aggregate within 1 minute windows and with a slide duration of 15 seconds. # http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window
# pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
windowedCounts = jsonoutput.groupBy(
window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),
jsonoutput.screen_name
).count()
Step 6: Start Spark Structure Streaming Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables. query_window = windowedCounts .writeStream .outputMode("complete") .format("memory") .queryName("myTable_window") .start()
query_json = jsonoutput .writeStream .outputMode("append") .format("memory") .queryName("myTable_json") .start()
Step 7: Interactively Query Structured Streaming Table Each in-memory table can be interactively queried to give the current state using standard SparkSQL syntax as shown below. Query myTable_json: Output 15 records of the in-memory table spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
Query myTable_json: Output the top 15 Twitter authors spark.sql("select screen_name, count(*) as count from myTable_json group by screen_name order by count desc limit 15").show(15,False)
Query myTable_window: Output the window aggregations spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
References: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
https://hortonworks.com/apache/spark/ https://hortonworks.com/apache/spark/ Environment: Apache Spark 2.1.0 (Pyspark with Python 2.7.5), Apache NiFi 1.1.0, Apache Kafka 2.10-0.8.2.1
... View more
Labels:
02-21-2017
05:48 PM
7 Kudos
What kind of text or unstructured data are you collecting within your company? Are you able to fully utilize this data to enhance predictive models, build reports/visualizations, and detect emerging trends found within the text?
Hadoop enables distributed, low-cost storage for this growing amount of unstructured data. In this post, I'll show one way to analyze unstructured data using Apache Spark. Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing.
When it comes to text analytics, you have a few option for analyzing text. I like to categorize these techniques like this:
Text Mining (i.e. Text clustering, data-driven topics)
Categorization (i.e. Tagging unstructured data into categories and sub-categories; hierarchies; taxonomies)
Entity Extraction (i.e. Extracting patterns such as phrases, addresses, product codes, phone numbers, etc.)
Sentiment Analysis (i.e. Tagging positive, negative, or neutral with varying levels of sentiment)
Deep Linguistics (i.e Semantics. Understanding causality, purpose, time, etc.)
Which technique you use typically depends on the business use case and the question(s) you are trying to answer. It's also common to combine these techniques.
This post will focus on text mining in order to uncover data-driven text topics.
For this example, I chose to analyze customer reviews of their airline experience. The goal of this analysis is to use statistics to find data-driven topics across a collection of customer airlines reviews. Here's the process I took:
1. Load the Airline Data from HDFS
rawdata = spark.read.load("hdfs://sandbox.hortonworks.com:8020/tmp/airlines.csv", format="csv", header=True)
# Show rawdata (as DataFrame)
rawdata.show(10)
2. Pre-processing and Text Cleanup
I believe this is the most important step in any text analytics process. Here I am converting each customer review into a list of words, while removing all stopwords (which is a list of commonly used words that we want removed from our analysis). I have also removed any special characters, punctuation, and lowcased the words so that everything is uniform when I create my term frequency matrix. As part of the text pre-processing phase, you may also choose to incorporate stemming (which groups all forms a word, such as grouping "take", "takes", "took", "taking"). Depending on your use case, you could also include part-of-speech tagging, which will identify nouns, verbs, adjectives, and more. These POS tags can be used for filtering and to identify advanced linguistic relationships.
def cleanup_text(record):
text = record[8]
uid = record[9]
words = text.split()
# Default list of Stopwords
stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at',
u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by',
u'can', 'cant', 'come', u'could', 'couldnt',
u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during',
u'each',
u'few', 'finally', u'for', u'from', u'further',
u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how',
u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself',
u'just',
u'll',
u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself',
u'no', u'nor', u'not', u'now',
u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own',
u'r', u're',
u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such',
u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too',
u'under', u'until', u'up',
u'very',
u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would',
u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
# Custom List of Stopwords - Add your own here
stopwords_custom = ['']
stopwords = stopwords_core + stopwords_custom
stopwords = [word.lower() for word in stopwords]
text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words] # Remove special characters
text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords] # Remove stopwords and words under X length
return text_out
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))
This will create an array of strings (words), which can be used within our Term-Frequency calculation.
3. Generate a TF-IDF (Term Frequency Inverse Document Frequency) Matrix
In this step, I calculate the TF-IDF. Our goal here is to put a weight on each word in order to assess its significance. This algorithm down-weights words that appear very frequently. For example, if the word "airline" appeared in every customer review, then it has little power in differentiating one review from another. Whereas the words "mechanical" and "failure" (as an example) may only be seen in a small subset of customer reviews, and therefore be more important in identifying a topic of interest.
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)
# Term Frequency Vectorization - Option 2 (CountVectorizer) :
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF
The TF-IDF algorithm produces a "feature" variable, which contains a vector set of term weights corresponding to each word within the associated customer review.
4. Use LDA to Cluster the TF-IDF Matrix
LDA (Latent Dirichlet Allocation) is a topic model that infers topics from a collection of unstructured data. The output of this algorithm is k number of topics, which correspond to the most significant and distinct topics across your set of customer reviews.
# Generate 25 Data-Driven Topics:
# "em" = expectation-maximization
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
model.isDistributed()
model.vocabSize()
ldatopics = ldamodel.describeTopics()
# Show the top 25 Topics
ldatopics.show(25)
Based on my airline data, this code will produce the following output:
Keep in mind that these topics are data-driven (using statistical techniques) and does not require external datasets, dictionaries, or training datasets to generate these results. Because of the statistical nature of this process, some business understanding and inference needs to be applied to these topics in order to create a clear description for business users. I've relabeled a few of these as an example:
Topic 0: Seating Concerns (more legroom, exit aisle, extra room)
Topic 1: Vegas Trips (including upgrades)
Topic 7: Carryon Luggage, Overhead Space Concerns
Topic 12: Denver Delays, Mechanical Issues
So how can you improve these topics?
Spend time on the text pre-processing phase
Remove stopwords
Incorporate stemming
Consider using part-of-speech tags
Play with the number of k, topics within the LDA algorithm. A small number of topics may not get you the granularity you need. Alternatively, too many topics could introduce redundancy or very similar topics.
5. What can you do with these results?
Look for trends in the Data-Driven Topics: You may start to notice an increase or decrease in a specific topic. For example, if you notice a growing trend for one of your topics (ie. mechanical/maintenance related issues out of JFK Airport), then you may need to do further investigate as to why. Text analytics can also help to undercover root causes for these mechanical issues (if you can get ahold of technician notes or maintenance logs). Or what if you notice a data-driven topic that discusses rude flight attendants or airline cleanliness; this may indicate an issue with a flight crew. Below, I have included a screenshot showing the trends for Topic 12: (Denver, Mechanical/Maintenance issues). You can see spikes for the various airlines, which indicate when customers were complaining about mechanical or maintenance related issues:
Reporting & Visualization
As another quick example, below, I am showing how you can use Apache Zeppelin, to create a time series for each topic, along with a dropdown box so that an airline (or another attribute of interest) can be selected.
Enhance Predictive Models with Text
Lastly, and likely the most important, these text topics can be used to enhance you predictive models. Since each topic can be it's own variable, you are now able to use these within your predictive model. As an example, if you add these new variables to your model, you may find that departure location, airline, and a topic related to mechanical issues are most predictive in whether or not a customer gives you a high or low rating.
Thanks for reading and please let me know if you have any feedback! I'm also working on additional examples to illustrate categorization, sentiment analytics, and how to use this data to enhance predictive models.
References:
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF
https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer
https://github.com/zaratsian/PySpark/blob/master/text_analytics_datadriven_topics.json
... View more
Labels:
01-03-2017
01:38 PM
9 Kudos
Apache Zeppelin is a flexible, multi-purpose notebook used for code editing and data visualization. It supports a variety of languages (using interpreters) and enables users to quickly prototype, write code, share code, and visualize the results using standard charts. But as you move towards data apps and front-end development, you may need to create custom visualizations.
Fortunately, Zeppelin comes with built-in support for Angular through both a front-end API and back-end API. I recently worked on a project to identify networking issues. These issues could range from bad/weak network signals, wiring issues, down nodes, etc. To replicate this, I created an example project where I mapped out the path that a packet takes from my home wifi connection to the google.com servers (using traceroute). The complete project is here, but for this post I want to focus on the steps I took to display the Spark output within Google Maps, all within Zeppelin. Here's a couple screenshots showing the end result of the traceroute analysis and Google Map within Zeppelin: Figure 1: Dashboard showing and input (Point-of-Interest, POI) and the Spark output Figure 2: Google Map created via Angular within Zeppelin Notebook From Figure 1, you can see that a user-defined point-of-interest (POI) can be entered by the user. This is the node, or IP address, that a user is interested in analyzing. From here, the Spark code parses the data, retrieves all parent and child nodes, checkes the health status of each node, then uses that data within Angular to generate the Google Map. Here are the steps that I took to generate this output: Step 1: Create a JS function to bind all required variables to a global object. I started with a helper JS function, thanks to @randerzander, which allowed me to bind my exposed variables to the JS globally accessible object. Within this code, you'll notice that I've includes 7 vars, which correspond to the variables and I will bind within my Spark code. %angular
<!-- Avoid constantly editing JS and list the Angular vars you want exposed in an HTML attribute: -->
<div id="dummy" vars="id,parents,children,poidevice,childrenStr,parentsStr,devicetopology"></div>
<script type="text/javascript">
//Given an element in the note & list of values to fetch from Spark
//window.angularVars.myVal will be current value of backend Spark val of same name
function hoist(element){
var varNames = element.attr('vars').split(',');
window.angularVars = {};
var scope = angular.element(element.parent('.ng-scope')).scope().compiledScope;
$.each(varNames, function(i, v){
window[v+'-watcher'] = scope.$watch(v, function(newVal, oldVal){
window.angularVars[v] = newVal;
});
});
}
hoist($('#dummy'));
</script>
Step 2: Use Spark to process the data, then bind each required variable to angular using z.angularBind. In this example, Spark is used to parse and transform the traceroute data. I chose to use Spark Streaming and stateful in-memory processing, using MapWithState, in order to maintain the current health status of each node within the network topology. The health status and enriched data for each node was then persisted to HBase (via Phoenix), where additional queries and fast, random access read/writes could be performed.
At this point, I have generated all of my Spark variables, but I need to bind them to the angular object in order to create a custom Google Map visualization. Within Spark, here's how you would bind the variables: z.angularBind("poidevice", POIDevice)
z.angularBind("parents", parentDevicesInfo)
z.angularBind("parentsStr", parentDevicesStr)
z.angularBind("children", childrenDevicesInfo)
z.angularBind("childrenStr", childrenDevicesStr)
z.angularBind("devicetopology", devicetopology)
z.angularBind("id", id)
The code, shown above, will bind my Spark variables (such as parentDevicesInfo) to my angular object (parents). Now that all of my Spark variables are available to my angular code, I can work to produce the Google Maps visualization (shown in Figure 2). Step 3: Write Angular JS code to display the Google Map Within Angular, I used the code (below) to read each variable using window.angularVars.<variable_name>. Then, to map each node or point-of-interest, I draw a circle and colored it green (health status = ok) or red (health status = bad). function initMap() {
var id = window.angularVars.id;
var poidevice = window.angularVars.poidevice;
var children = window.angularVars.children;
var childrenStr = window.angularVars.childrenStr;
var parents = window.angularVars.parents;
var parentsStr = window.angularVars.parentsStr;
var devicetopology = window.angularVars.devicetopology;
var POIs = {};
console.log('POI Value: ' + poidevice[0].values);
console.log('Topology Value: ' + devicetopology[0].values.toString().split("|")[0]);
var USA = {lat: 39.8282, lng: -98.5795};
var map = new google.maps.Map(document.getElementById('map'), {zoom: 5, center: USA });
//**********************************************************************************
//
// Draw circle POI Device
//
//**********************************************************************************
$.each(poidevice, function(i, v){
POIs[v.values[0]] = v.values;
//Create marker for each POI
var pos = {lat: parseFloat(v.values[5]), lng: parseFloat(v.values[4]) };
var color = (v.values[11] == '1.0') ? '#FF0000' : '#008000';
var info = '<b>IP</b>: ' + v.values[0] + '<p><b>ISP:</b> ' + v.values[6] + '<p>' + v.values[3] + ", " + v.values[2];
console.log('Drawing POI device: ' + v.values[0] + ' ' + JSON.stringify(pos) + ' ' + v.values[5] + ',' + v.values[4]);
circle(pos, color, info, map);
});
...end of code snippet...
The project is on my Github and shows the full angular code used to create each edge connection based on all parent and child nodes as well as the Spark Streaming code (which I'll detail in a follow-up HCC article).
... View more
Labels:
08-08-2016
09:08 PM
4 Kudos
Hortonworks and SAS have partnered to create two new Apache NiFi processors. These processors allow data/events to be streamed between
Hortonworks DataFlow (HDF) and SAS Event Stream Processing. Why does this matter?
HDF powered by Apache NiFi, Kafka and Storm, is an integrated system for real-time dataflow management and streaming analytics on-premise or in the cloud. SAS Event Stream Processing is a real-time, low-latency, high-throughput event processing solution that can deploy SAS machine learning models. By integrating these technologies, organizations now have the option of deploying their SAS models in real-time within the Hortonworks platform. This offers flexible deployment options for your streaming analytics projects, while providing powerful analytics from SAS.
How does this integration work?
There are two new processors that can be added to NiFi:
ListenESP: This processor initiates a listener within NiFi that receives events from the SAS Event Stream Processing data stream. PutESP: This processor sends events from NiFi to the SAS Event Stream Processing data stream. Setup and configuration:
Download and install Hortonworks DataFlow Copy the SAS .nar file to $NIFI_HOME/lib (This .nar file is provided by SAS when SAS Event Stream Processing is purchased.) Edit $NIFI_HOME/conf/nifi.properties and change the web HTTP port to 31005 ( nifi.web.http.port=31005 ) or another available port of your choice. Start NiFi by running $NIFI_HOME/bin/nifi.sh run Open a browser and go to http://$HOST:31005/nifi
NOTE: For this to work, SAS Event Stream Processing must be purchased and have a valid license.
Once the .nar file has been added, you will have access to the two processors within NiFi. Data events are shared using an Avro schema. Below is a basic example of a NiFi dataflow using both a ListenESP and PutESP (Shown in
Figure 1).
Within the PutESP processor, you'll notice a few parameters (shown below in
Figure 2😞
Pub/Sub Host: Hostname or IP of the server running SAS Event Stream Processing. Pub/Sub Port: Pubsub port of the SAS Event Stream Processing engine. Project: SAS Event Stream Processing project name. Continuous Query: Name of the continuous query within the SAS Event Stream Processing project. Source Window: Source window within SAS Event Stream Processing where events from NiFi can be injected.
The ListenESP processor has similar parameters (shown below in
Figure 3😞
For more information, check out
Hortonworks DataFlow (HDF) powered by Apache NiFi and SAS Event Stream Processing.
... View more
Labels: