Member since
08-03-2017
14
Posts
17
Kudos Received
0
Solutions
06-18-2018
11:39 PM
@Kit Menke isn't wrong. Take a look at the API docs. You'll notice there are several options for creating data frames from an RDD. In your case; it looks as though you have an RDD of class type Row; so you'll need to also provide a schema to the createDataFrame() method. Scala API docs: https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema =
StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sqlContext.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)
dataFrame.createOrReplaceTempView("people")
sqlContext.sql("select name from people").collect.foreach(println)
... View more
05-16-2018
05:34 PM
9 Kudos
Short Description: A quick tutorial on how to query cryptocurrency transactions with Spark Streaming with Python. Abstract: This tutorial covers how one can use Nifi to stream public cryptocurrency transactional data to Kafka and consequently query the transactional stream with Spark Streaming. We will be using the Pyspark API for demonstration purposes, however, I've also included a Java version for reference. It is strongly recommended that one use Java or Scala for production-grade applications; as most of the generally available (GA) features in Spark are published to support those languages foremost. I will provide links to both versions. Figures are used as graphical aids. Steps are highlighted by bullet points. What is Spark? Why is it well suited for Streaming at scale? Apache Spark is a distributed processing engine for data science and data engineering at scale. It features several out of the box libraries for machine learning, map reduce, graph analysis, micro batching, and structured query language. It is well suited to poll streams of data due to its language-integrated application programming interface (API) and fault tolerant behavior. Users can write stream jobs the same way they would batch and recover both lost work and operator state without extra code. The Spark team has conveniently encapsulated these concerns into Spark’s core API. With Spark Streaming, one can ingest data from multiple sources (Kafka, Kinesis, Flume, and TCP sockets) and process data with abstracted functions like map, reduce, join, and window. Results can subsequently be pushed to HDFS, databases, and dashboards for meaningful insights. Simply put, Spark Streaming provides a very convenient way for Hadoop developers to grab data in real-time and persist it to their system of choice. What is micro batching? “Micro batching is defined as the procedure in which the incoming stream of messages is processed by dividing them into group of small batches. This helps to achieve the performance benefits of batch processing; however, at the same time, it helps to keep the latency of processing of each message minimal”. –Apache Spark 2.x for Java Developers , Sumit Kumar & Sourav Gulati, O’Reilly Media Inc. In Spark Streaming a micro-batch is a sequence of resilient distributed datasets (RDDs) packaged into an abstraction known as a Discretized Streams (Streams). Users can batch DStreams on time intervals, with batches ranging between 1-10 seconds. Determining the batch size really depends on the needs of the downstream system consumer and overall applicability of how fast a consumer can write the batches to disk or present information to a web browser. Where are we getting our data? We will be subscribing to a Satori public data channel for global cryptocurrency transactions. According to Satori, “Get streaming data for cryptocurrency market prices as well as exchange rates between alternative cryptocurrencies. Data is streamed from exchanges around the globe, where crypto-coins are traded in the US Dollar, Canadian Dollar, British Pound, Japanese Yen, Russian Ruble, etc.” -https://www.satori.com/livedata/channels/cryptocurrency-market-data Note: This tutorial assumes the user has successfully installed Spark2 on HDP and Nifi and Kafka on HDF. It also assumes the user has some familiarity with Nifi and Kafka, but it isn’t required to successfully complete the tutorial. For detailed instructions on how to accomplish these steps please consult our HDP and HDF user documentation: HDP User Guide-Apache Spark HDF Installation Guide HDF User Guide-Apache Kafka One can also use the Hortonworks Sandbox environments to complete this tutorial:https://hortonworks.com/products/sandbox/ Tools: One can create the application logic for this tutorial by cloning my GitHub repository. Feel free to make contributions by forking the repo and submitting pull requests. #Python Version
git clone https://github.com/patalwell/SparkStreamingKafka.git
#Java Version
git clone https://github.com/patalwell/SparkStreamingSatori.git Tutorial: Step 1: Setting up a Satori Account and Understanding the Client Connection Class and Methods In order to create our Nifi flow, we’ll need to visit Satori and create a developer account. You can follow the directions here: https://developer.satori.com/#/signup Save your application key, the endpoint, and the channel name. We’ll be inserting these into our custom Satori Nifi processor. Big thanks to Laurence Da Luz for creating the Satori processor! See Figure 1 Figure 1: The Satori Developer Sign Up Portal Once we’ve created a satori account, a new project, and app keys we can view the live data channels for cryptocurrency to get a taste of how we’ll be making a client connection to the channel. You’ll notice they have message fields( this will serve as our schema) and several client SDKs for C, C#, Go, JS, Java, and Python. See Figure 2 and 3. Figure 2: Message Fields and JSON Payload {
"exchange": "OKCoin",
"cryptocurrency": "ETH",
"basecurrency": "USD",
"type": "market",
"price": "779.09",
"size": null,
"bid": "773.64",
"ask": "779.7",
"open": null,
"high": "779.69",
"low": "741",
"volume": "85.46",
"timestamp": "1525898277327"
}
Figure 3: The Python Client SDK for Satori #!/usr/bin/env python
from __future__ import print_function
import sys
import time
from satori.rtm.client import make_client, SubscriptionMode
endpoint = "wss://open-data.api.satori.com"
appkey = undefined
channel = "cryptocurrency-market-data"
def main():
with make_client(endpoint=endpoint, appkey=appkey) as client:
print('Connected to Satori RTM!')
class SubscriptionObserver(object):
def on_subscription_data(self, data):
for message in data['messages']:
print("Got message:", message)
subscription_observer = SubscriptionObserver()
client.subscribe(
channel,
SubscriptionMode.SIMPLE,
subscription_observer)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main() As you can see, we're importing the necessary Python modules, defining our connection macros, and instantiating a constructor to subscribe to the Satori client. For the sake of the tutorial, I've decided to abstract these object-oriented necessities from the user with the custom Satori processor. We'll still be making a connection to Satori, but it will be encapsulated within that processor. Step 2: Copy our Custom Processor to Nifi Next, we’ll clone my repository into the home directory of the nodes the Nifi client is installed on. You’ll find the satori-jolt-kafka-spark.xml template for our Nifi flow and the nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar for our Satori processor in the /nifi directory of the SparkStreamingKafka parent directory. git clone https://github.com/patalwell/SparkStreamingKafka.git
cd SparkStreamingKafka/nifi We’ll need to create a custom directory for our custom processor in order to prevent conflicts with custom nars during a Nifi upgrade. As such, we’ll add a separate directory for our custom processor, change the user and group ownership to Nifi, copy in our custom Satori processor nar, and add a line entry to nifi.properties within Ambari Figures 1, 2 and 3. Be sure to restart Nifi after you've finished these steps. It will prompt you to do this after you've updated the site.xml. #Make a custom nars directory and change user/group ownership to nifi
sudo mkdir /usr/hdf/current/nifi/nars-custom/
sudo chown nifi /usr/hdf/current/nifi/nars-custom/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/
#Make a lib directory and change user/group ownership to nifi
sudo mkdir /usr/hdf/current/nifi/nars-custom/lib1/
sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/
#copy our custom nar into our new diretory
sudo cp ~/SparkStreamingKafka/nifi/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar /usr/hdf/current/nifi/nars-custom/lib1/.
sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar
#validate our permissions
ls -la /usr/hdf/current/nifi/nars-custom/lib1/
Figure 1: Updating the custom Nifi directory for our custom Satori processor Figure 2: Adding a property to custom nifi.properties.xml site file in the Nifi configuration panel of Ambari Figure 3: Restarting the Nifi client Step 3: Setup the Nifi Flow to Subscribe to our Satori Channel and Publish our Topic to Kafka
Now that we’ve gotten administrative work out of the way we can simply add our flow template to the Nifi canvas, create a processor group, and define a few macros within our processors. Navigate to the Nifi UI from Ambari by selecting the Nifi service, quick links and your host of choice.
Let's create a processor group Figure 1, double-click into the processor group, and import the satori-jolt-kafka.xml file into Nifi. I suggest downloading this file locally from my GitHub repository to make it easier to upload to the user interface. You can upload a template to Nifi by navigating to the Nifi flow process group icon on the left of the UI. It’s the box directly under navigate labeled operate. You’ll want to click the smaller icon that is first from the right. It should have an arrow pointing to a mock flow. See Figures 1, 2 and 3 Figure 1: Creating a processor group Figure 2: Navigating to create a processor group, uploading our template, and dragging our template to the canvas Figure 3: Uploading our satori-jolt-kafka.xml template
Once the template has been uploaded you can navigate to the top toolbar and select the second icon from the right. It resembles a group of processors. Drag that onto the console and select the Satori-Jolt-Kafka flow Figure 2 and 4. Your screen should now resemble Figure 4 Figure 4: Our Satori-Jolt-Kafka Flow
Now that we’ve successfully uploaded our template we can edit our processors and take a look at what’s going on. Right-click on the ConsumeSatori processor, select configure, navigate to the properties tab and add the application key we made when we signed up for a Satori developer account. You’ll notice the value for our App Key field has now changed to reflect a sensitive one with “sensitive value set”. The endpoint argument should resemble wss://open-data.api.satori.com and our channel should be cryptocurrency-market-data. Go ahead and click apply. The Processor should no longer have a warning emblem. Figure 5 Figure 5: Adding our Satori App Key and validating our other client SDK constructor arguments
The Satori processor is making the connection to the Satori API without having to write a bunch of code and its emitting JSON records as a string. This will be problematic later down the line, *palm to forehead for anyone that's dabbed with JSON, so I’ve opted to explicitly cast dataTypes() with a JSON Jolt processor.
Right-click on the Jolt Processor and select Configure. When the configuration menu comes up select the properties tab. For our Jolt Transformation DSL Value select Chain Figure 6. You can learn more about Jolt and how it can be used to manipulate JSON here: Jolt JSON Transformation Editor Figure 6: The Chain Jolt Transformation. We are using a combination of specifications to remove fields, apply new fields, create values, and cast our fields from string to their respective types
Click the advanced tab on the lower left of the processor. This will populate an editor so we can test our Jolt specification logic. I've included the Jolt Specification below so we can see what's going on step by step. I've also included the raw payload so we can see what the transformations have accomplished for us. #Jolt Specification for Jolt Transform JSON Processor
[{
"operation": "remove",
"spec": {
"timestamp": ""
}
},
{
"operation": "default",
"spec": {
"timestamp": "${now():toNumber()}"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"exchange": "=toString",
"cryptocurrency": "=toString",
"type": "=toString",
"price": ["=toDouble", 0.0],
"size": ["=toDouble", 0.0],
"bid": ["=toDouble", 0.0],
"ask": ["=toDouble", 0.0],
"open": ["=toDouble", 0.0],
"high": ["=toDouble", 0.0],
"low": ["=toDouble", 0.0],
"volume": ["=toDouble", 0.0],
"timestamp": "=toLong"
}
}]
The first operation is very intuitive. We are simply removing the timestamp field. I've opted to do this since some of the raw data contains "nulls". You could keep the data in its raw form by eliminating this step, but I've elected to make the processor easier to understand by adding features a user might want to take advantage of.
The second operation allows us to access the Nifi expression language to take a current timestamp and cast it as a number. See the Nifi expression language guide for more advanced topics and other methods: Apache Nifi Expression Language Guide
Our final operation, modify-overwrite-beta, is casting the fields to their respective types. If the fields are null they are filled with the second parameter. In this case, I've elected to fill the values with 0.0. The SparkStreaming API allows us to drop Null values, but again I've elected to showcase some of the features within Jolt for the sake of this tutorial. Note: Casting nulls as 0.0 might not be the best option for statistical aggregations as it will manipulate averages
Copy and paste the raw cryptocurrency transaction JSON into the JSON input field and click transform Figure 7 //Raw Cryptocurrency transaction payload
{
"exchange" : "OKCoin",
"cryptocurrency" : "BTC",
"basecurrency" : "USD",
"type" : "market",
"price" : "9108.77",
"size" : null,
"bid" : "9139.21",
"ask" : "9207.94",
"open" : null,
"high" : "9519.99",
"low" : "9026.95",
"volume" : "119.54",
"timestamp" : "1525197075190"
}
Figure 7: The Jolt Transformation DSL and Specification Editor
Finally, let’s review our publish Kafka processor and set the relevant macro values we’ll need. Right-click on the PublishKafka processor and select Configure. Fill in the Kafka Broker value with the address to your Kafka broker; typically starting with the hostname Kafka is installed on and ending with port 6667 e.g. pathdf3.field.hortonworks.com:6667. Set the topic name to cryptocurrency-nifi-data and delivery guarantee to best effort. Click apply. Figure 8
If you don't know your Kafka broker address. Navigate to Ambari and select Kafka on your list of active services and select the configs tab. You'll see the broker hosts as the first line under the Kafka Broker field. Figure 9 Figure 8: Configuring our KafkaProducer Processor Figure 9: Kafka Broker Configurations and Hostnames Step 4: Review the Spark Streaming Application Logic
Now that we’ve successfully subscribed to a public data feed, cast our payload with the appropriate data types, and published our data to Kafka we can review the Spark Streaming application logic. This is located in our repository as: ~/SparkStreamingKafka/DStreamCyrptoStream.py
Starting from the top, let's import the necessary modules from Pyspark and define several parameter variables to instantiate a constructor for our SparkContext class object. Then let's create a SparkStreaming class object with the latter object and batch interval of 10 as constructor arguments. Finally, we'll want to create a kafkaStream (a DStream class object) with the KafkaUtils class and createDirectStream() method. Let's define our KafkaStream with several arguments e.g. Kafka broker, topic, and starting offsets. Python Imports and Class Instantiations: from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import *
from pyspark.sql.types import *
""" Note: When running this app with spark-submit you need the following
spark-submit --packages
org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0
{appName.py}
"""
MASTER = "local[*]"
APP_NAME = "DStreamCryptoStream"
KAFKA_BROKER = "pathdf3.field.hortonworks.com:6667"
KAFKA_TOPIC = ['cryptocurrency-nifi-data']
BATCH_INTERVAL = 10
OFFSET = "earliest"
#constructor for sparkContext; enables spark core
sc = SparkContext(MASTER, APP_NAME)
# constructor accepts SparkContext and Duration in Seconds
# e.g. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
# Durations.seconds(10));
ssc = StreamingContext(sc, BATCH_INTERVAL)
# Instantiate our DirectStream with the KafkaUtils class and subsequent method
# Parameters include StreamingContext, Topics, KafkaParameters
# To set the stream to capture the earliest data use:
kafkaStream = KafkaUtils.createDirectStream(ssc=ssc,topics=KAFKA_TOPIC
,kafkaParams={"metadata.broker.list":KAFKA_BROKER,
"startingOffsets":OFFSET})
Next, we'll want to create yet another object value, and filter our stream for values. The kafkaStream DStreams class object we created returns (key, value) pairs from Kafka; but we only care about the physical payload; e.g. our JSON, ergo the lambda function to filter our DStream. # The data from Kafka is returned as a tuple (Key, Value). So we'll want to
# transform the DStream with a mappper and point to the value portion of our tuple
value = kafkaStream.map(lambda line: line[1])
#value is now a Dstream object
print value
# print type(value)
# <class 'pyspark.streaming.kafka.KafkaTransformedDStream'>
Now, we need to create a SparkSession by using the SparkContext we already created and define a function to query our JSON. To create a data frame from our raw JSON, I've elected to create a schema class object with Pyspark's StructType class. This is mapping our fields to their respective data types. I'm also subsequently using the data frame API to build a table, filter out null values, and create a temporary table "CryptoCurrency" to enable querying on our stream. # Lazily instantiated global instance of SparkSession (This is a hack to grab
# sql context, we need to create a SparkSession using the SparkContext that the
# Streaming conext is using. The method enables this during restart of the
# driver.
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf)\
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
# DataFrame operations inside your streaming program
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
schema = StructType([
StructField('exchange', StringType())
, StructField('cryptocurrency', StringType())
, StructField('basecurrency', StringType())
, StructField('type', StringType())
, StructField('price', DoubleType())
, StructField('size', DoubleType())
, StructField('bid', DoubleType())
, StructField('ask', DoubleType())
, StructField('open', DoubleType())
, StructField('high', DoubleType())
, StructField('low', DoubleType())
, StructField('volume', DoubleType())
, StructField('timestamp', LongType())
])
# Convert RDD[String] to JSON DataFrame by casting the schema
data = spark.read.json(rdd, schema=schema)
# data.show()
# drop null values from our aggregations
df = data.na.drop()
# Check the explicitly mapped schema
# df.printSchema()
#Create a tempView so edits can be made in SQL
df.createOrReplaceTempView("CryptoCurrency")
At this point, you can write SQL against the stream. I've further elected to select the cryptocurrency, average price, max price, min price, standard deviation. Additionally, I'm adding a few functions to normalize the distributions and create upper and lower bounds for our standard deviations. I'm also filtering on US Dollars, Bitcoin, Algebraix Coin, LiteCoin, and Ethereum. Feel free to alter this query how you see fit. Note: This is by no means a statistically significant outcome. You'll need to use historical data to create a baseline for the mean, stddev and confidence interval. The population size should be significant enough to meet the demands of your needs. What is a Confidence Interval?
Finally, we'll need to apply the function to our filtered Kafka DStream class object value by using one of the several methods that come with the class. In this case, we're using forEachRDD(). Let's start our application with start() and elect to await keyboard termination with awaitTermination(). # Get avg, max, min, and stdev for BTC, ETH, LTC, and ALX
# we need to normalize our standard deviation by dividing by our
# price average in order to calculate a per transaction
# normalized_stnd_dev closer to 0 is less volatile
print "**** Running Statistics of CryptoCurrency ****"
spark.sql("""
SELECT cryptocurrency
,avg(price) - std(price) as lower_1_std_bound
,avg(price) as average_price
,avg(price) + std(price) as upper_1_std_bound
,max(price) as max_price
,min(price) as min_price
,std(price) as 1_std
,std(price) * 2 as 2_std
,std(price)/avg(price)*100 as normalized_stnd_dev
FROM CryptoCurrency
WHERE (cryptocurrency =='ADX'
OR cryptocurrency == 'BTC'
OR cryptocurrency == 'LTC'
OR cryptocurrency == 'ETH')
AND basecurrency == 'USD'
GROUP BY cryptocurrency
ORDER BY cryptocurrency""").show()
except:
pass
# conduct an operation on our DStreams object; we are
# inserting our def process function here and applying said function to each
# RDD generated in the stream
value.foreachRDD(process)
# start our computations and stop when the user has issued a keyboard command
ssc.start()
ssc.awaitTermination() Step 5: Starting our Spark Streaming Application and Nifi flow
Assuming you have the spark-client configured to work from your home directory you can simply type the following command to run this application. We need the Kafka Assembly package in order to load the KafkaUtils class. spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py
You can test if SPARK_HOME is set by running spark-submit - -version you should see something like Figure 1. If Spark Home is not set you can still call the binaries with the following command: EDIT THIS /usr/hdp/current/spark-client/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py
Figure 1: Spark-Submit –version to validate Spark Home
After we've started the application, navigate back to Nifi and start our flow. As you can see, this makes editing streaming applications a breeze. When the flow is started you should momentarily see data printed to the console. Figure 2 and 3 Figure 2: Starting our Nifi Flow Figure 3: Results of our Streaming Application Conclusion: Nice! You’ve successfully used Spark Streaming to micro batch cryptocurrency transactions to the console. You can also consult the documentation to learn more about the classes and methods that might be more appropriate for your use case. For a full listing of features, classes, and methods please review the Spark Streaming documentation or post questions to the Hortonworks Community Connection. I also plan on working with colleagues to implement some modeling on our data to calculate Value at Risk (VaR), create a dashboard with Superset, and persist our data to HBase for historical batch analysis via Pheonix. As always, comments and suggestions for other tutorials are highly encouraged.
... View more
Labels:
05-16-2018
04:51 PM
“In meetings philosophy might work,
[but in] the field practicality works.” –Amit Kalantri Short Description: A quick tutorial on how to query cryptocurrency transactions with Spark Streaming Abstract: This tutorial covers how once can use Nifi to stream public cryptocurrency transactional data to Kafka and consequently query the transactional stream with Spark Streaming. We will be using the Pyspark API for demonstration purposes. It is strongly recommended that one use Java or Scala for production grade applications; as most of the generally available (GA) features in Spark are published to support those languages foremost. Figures are used as a user graphical aid. Steps are highlighted by bullet points. What is Spark? Why is it well suited for Streaming at scale? Apache Spark is a distributed processing engine for data science and data engineering at scale. It features several out of the box libraries for machine learning, map reduce, graph analysis, micro batching, and structured query language. It is well suited to poll streams of data due to its language-integrated application programming interface (API) and fault tolerant behavior. Users can write stream jobs the same way they would batch and recover both lost work and operator state without extra code. The Spark team has conveniently encapsulated these concerns into Spark’s core API. With Spark Streaming, one can ingest data from multiple sources (Kafka, Kinesis, Flume, and TCP sockets) and process data with abstracted functions like map, reduce, join, and window. Results can subsequently be pushed to HDFS, databases, and dashboards for meaningful insights. Simply put, Spark Streaming provides a very convenient way for Hadoop developers to grab data in realtime and persist it to their system of choice. What is micro batching? “Micro batching is defined as the procedure in which the incoming stream of messages is processed by dividing them into group of small batches. This helps to achieve the performance benefits of batch processing; however, at the same time, it helps to keep the latency of processing of each message minimal”. –Apache Spark 2.x for Java Developers , Sumit Kumar & Sourav Gulati, O’Reilly Media Inc. In Spark Streaming a micro batch is a sequence of resilient distributed datasets (RDDs) packaged into an abstraction known as a Discretized Streams (Streams). Users can batch DStreams on time intervals, with batches ranging between 1-10 seconds. Determining the batch size really depends on the needs of the downstream system consumer and overall applicability of how fast a consumer can write the batches to disk or present information to a web browser. Where are we getting our data? We will be subscribing to a Satori public data channel for global cryptocurrency transactions. According to Satori one can, “Get streaming data for cryptocurrency market prices as well as exchange rates between alternative cryptocurrencies. Data is streamed from exchanges around the globe, where crypto-coins are traded in the US Dollar, Canadian Dollar, British Pound, Japanese Yen, Russian Ruble, etc.” -https://www.satori.com/livedata/channels/cryptocurrency-market-data Note: This tutorial assumes the user has successfully installed Spark2 on HDP and Nifi and Kafka on HDF. It also assumes the user has some familiarity with Nifi and Kafka; but it isn’t required to successfully complete the tutorial. For detailed instructions on how to accomplish these steps please consult our HDP and HDF user documentation: HDP User Guide-Apache Spark HDF Installation Guide HDF User Guide-Apache Kafka One can also use the Hortonworks Sandbox environments to complete this tutorial:https://hortonworks.com/products/sandbox/ Tools: One can create the application logic for this tutorial by cloning my github repository. Feel free to make contributions by forking the repo and submitting pull requests. git clone https://github.com/patalwell/SparkStreamingKafka.git
... View more
Labels:
04-25-2018
04:47 AM
You could also drop null values from your initial columns with: train_df <-
dropna(train_df,cols = 'ARR_DELAY')
... View more
03-29-2018
08:07 PM
Labels:
02-23-2018
07:44 PM
@Magnus Runesson This is something that should be supported: http://atlas.apache.org/Bridge-Hive.html Let me try and replicate.
... View more
09-07-2017
01:58 PM
9 Kudos
Short Description:
A quick tutorial on how to mask columns in Hive for regulatory purposes
Abstract:
This tutorial will cover how to apply tags to Atlas entities and subsequently leverage tagging policies in Ranger to mask Personally Identifiable Information (PII). Atlas serves as a common metadata store designed to exchange metadata both within and outside of the Hadoop stack. It features a simple user interface and a REST API for ease of access and integration. The Atlas-Ranger paradigm unites data classification with policy enforcement. Figures will be used as a graphical aid. Steps will be provided in between figures via bullet points.
Note:
This tutorial assumes the user has successfully installed Ranger, enabled Ranger audit to Solr, installed Atlas, installed Hive, configured Atlas to work with Hive, and configured Atlas to work with Ranger. It also assumes the user has dummy data for development purposes. For detailed instructions on how to accomplish these steps please review our HDP development documentation:
HDP Developer Guide: Data Governance.
Tools:
You can create the dummy data we will use in this tutorial via the following commands. Ensure you have the proper user privileges to write files to your local environment and copy files into HDFS.
Statement for creating the employee table
create table employee (ssn string, name string, location string)
row format delimited
fields terminated by ','
stored as textfile;
Statement for creating our dummy data
printf "111-111-111,Danny,San Jose\n222-222-222,Mike,Santa Clara\n333-333-333,Robert,Fremont\n345-444-134,James,San Diego\n759-3264-876,Tom,Santa Barbra\n253-856-987,Bobby,Holmdel\n675-883-996,Pat,San Clemente\n859-654-774,John,San Francisco\n568-412-778,Jane,Fremont\n546-889-778,Richard,San Jose\n554-996-332,Jeannine,Toms River\n587-665-124,Alex,New York" > employeedata.txt
Statement for transfer data to the Hive Warehouse
hdfs dfs -copyFromLocal employeedata.txt /apps/hive/warehouse/employee
How to Mask Columns in Hive with Atlas and Ranger
Step 1: Creating our Hive table and populating data
Log into Ambari and navigate to the Hive UI. Submit the create table statement from the tools above. Figure 1
Figure 1: The HIVE UI and create employee table statement
Generate our dummy data. Figure 2
Figure 2: The CLI command to generate our dummy data
Transfer our dummy data to the Hive warehouse and our employee table. Figure 3
Figure 3: The HDFS command to transfer our data to the HIve Warehouse
Ensure our data was properly populated into our Hive table via a select * from employee; SQL command. Figure 4
Figure 4: The "select * from employee" Hive table.
Step 2: Creating a tag in the Atlas UI
Now that we have a table with PII we can create a tag in atlas and leverage the event related metadata we created during the creation of our Hive table.
Log into Atlas UI and navigate to tags. Click the create tag button. Let's create a tag called PII and give it a description. Figure 1
Figure 1: The Atlas UI and the Creation of a Tag
Now that we have our PII tag, we can append our tag to an Atlas entity. In this case we want to search for hive_table; which is an entity atlas supports out of the box. Once we've found our entity, we can then search for the columns we wish to task and subsequently mask. Figure 2
Figure 2: Navigating to our hive_table entities and searching for the employee table
In this case let's select the employee table; which contains a ssn column. After we've clicked on the table we wish to work with; we can navigate to our columns of choice ssn.
Figure 3
Figure 3: Employee hive_table entity and subsequent columns
Once we've clicked on the ssn column under the key columns and value ssn we can add a tag via the button underneath the name of the Atlas hive_column entity. Figure 4
Let's select PII, the tag we previously created and click add. After we've tagged the ssn (hive_column) entity, our tags key will show a blue PII tag to let the user know a tag has been applied to the entity.
Figure 4: Adding a tag via the tagging + button under ssn(hive_column)
We can now navigate back to the tags page of our UI and see that our PII tag has been applied to the ssn column. Figure 5
This navigation and classification paradigm allows the user to mask multiple columns in a variety of datasets. This is just a simple example; but one can leverage this notion to apply tagging to other conventions. e.g the dev_group only has access to website_user_tables, the data_science_group only has access to specific tables, and the admin_group can assign create, update, alter SQL commands to users or groups that should have those privileges and assign select SQL commands to anyone who is not allowed to alter data.
Figure 5: The assignment of our PII tag to the employee.ssn column
Step 3: Creating a Tag Based Policy in Ranger
Now that we've created a table in Hive and assigned a tag in Atlas, we can create a tag based policy in Ranger.
Log into the Ranger UI and navigate to access manager >> tag based policies. Figure 1
Figure 1: Accessing the Ranger Tag Based Policies
Click the + button to create a new tag policy. Let's name ours PII_Tagging_Policy, add a description, and click Add. Figure 2
Figure 2: Creating a new tagging Service called PII_Tagging_Policy
After adding the new tag based policy, navigate back to the tag based policies service manager and click the tagging policy we just created. There are two tabs within our tagging policy: Access and Masking. Figure 3
Figure 3: Navigating to the Tagging Policy we just created
Since we want to apply a mask to the columns we've tagged in Atlas with PII we are going to create a masking policy, so navigate to the masking tab and select Add New Policy. Figure 4
Figure 4: Adding a new masking policy within our PII_Tagging_Policy
Let's name our new policy PII_Masking_Policy, and select the Atlas Tag we previously created.
Be sure to select the user our group you want to apply this tagging policy to. In this case, I am applying the policy to a single user, admin. Our mask conditions will be for Hive and we will select a Hash Mask
This is where Ranger integrates with Atlas and applies a hashing policy to the column entities we selected within Atlas. The Ranger tag policy will follow our PII tag on all of our Hive datasets; and we can now manage our how our policy is applied via Atlas. Figure 5 Be sure to include the relevant user or group you'd like apply this policy to. In this case, I choose the user admin.
Figure 5: Applying our Atlas Tag and Ranger Masking Policy
We still need to ensure Ranger applies this policy to our Hive resource within Ranger so let's navigate to access manager>> resource based policies and edit our hive service. In this case, we're editing the woolford_hdp_hive service. Click the pencil icon next to the service based policy name. Figure 6
If you do not have a hive service, you can create a new service and apply the proper settings.
Figure 6: Navigating to the resource based policies page and the hive service folder
Edit the hive service by selecting the Ranger tag policy we just created "PII_Tagging_Policy". There is a drop down menu next to the Select Tag Service property. Figure 7
Save your changes.
Figure 7: Applying our Ranger tagging policy to the Hive Resource Step 4: Verifying our Users cannot access PII
Now that we've created a tag, applied that tag to an Atlas entity, (our Hive column ssn from the employees table), created a Ranger tagging policy that leverages that tag and masks the information within the tagged column, and applied that tagging policy to our Hive service we can validate our policy from the Hive UI.
Navigate to the Hive UI and fire off a "select * from employee;" query.
You will now see a mask over the ssn column. Figure 1
Figure 1: The Hive UI and our masked ssn column, notice the Hash over our PII data
Conclusion:
Congratulations; you're now able to successfully implement a tagging policy that masks Personally Identifiable Information so your organization can comply with regulatory guidelines. You can also perform a number of other tasks that are very similar to the policy you've just developed. For a full listing of features please refer to the developer guide above or post questions to the Hortonworks Community Connection. Suggestions for other tutorials are also highly encouraged.
... View more
Labels: