Created on 05-16-2018 05:34 PM
A quick tutorial on how to query cryptocurrency transactions with Spark Streaming with Python.
This tutorial covers how one can use Nifi to stream public cryptocurrency transactional data to Kafka and consequently query the transactional stream with Spark Streaming. We will be using the Pyspark API for demonstration purposes, however, I've also included a Java version for reference. It is strongly recommended that one use Java or Scala for production-grade applications; as most of the generally available (GA) features in Spark are published to support those languages foremost. I will provide links to both versions. Figures are used as graphical aids. Steps are highlighted by bullet points.
Apache Spark is a distributed processing engine for data science and data engineering at scale. It features several out of the box libraries for machine learning, map reduce, graph analysis, micro batching, and structured query language. It is well suited to poll streams of data due to its language-integrated application programming interface (API) and fault tolerant behavior. Users can write stream jobs the same way they would batch and recover both lost work and operator state without extra code. The Spark team has conveniently encapsulated these concerns into Spark’s core API.
With Spark Streaming, one can ingest data from multiple sources (Kafka, Kinesis, Flume, and TCP sockets) and process data with abstracted functions like map, reduce, join, and window. Results can subsequently be pushed to HDFS, databases, and dashboards for meaningful insights.
Simply put, Spark Streaming provides a very convenient way for Hadoop developers to grab data in real-time and persist it to their system of choice.
“Micro batching is defined as the procedure in which the incoming stream of messages is processed by dividing them into group of small batches. This helps to achieve the performance benefits of batch processing; however, at the same time, it helps to keep the latency of processing of each message minimal”. –Apache Spark 2.x for Java Developers , Sumit Kumar & Sourav Gulati, O’Reilly Media Inc.
In Spark Streaming a micro-batch is a sequence of resilient distributed datasets (RDDs) packaged into an abstraction known as a Discretized Streams (Streams). Users can batch DStreams on time intervals, with batches ranging between 1-10 seconds. Determining the batch size really depends on the needs of the downstream system consumer and overall applicability of how fast a consumer can write the batches to disk or present information to a web browser.
We will be subscribing to a Satori public data channel for global cryptocurrency transactions. According to Satori,
“Get streaming data for cryptocurrency market prices as well as exchange rates between alternative cryptocurrencies. Data is streamed from exchanges around the globe, where crypto-coins are traded in the US Dollar, Canadian Dollar, British Pound, Japanese Yen, Russian Ruble, etc.” -https://www.satori.com/livedata/channels/cryptocurrency-market-data
This tutorial assumes the user has successfully installed Spark2 on HDP and Nifi and Kafka on HDF. It also assumes the user has some familiarity with Nifi and Kafka, but it isn’t required to successfully complete the tutorial. For detailed instructions on how to accomplish these steps please consult our HDP and HDF user documentation:
One can also use the Hortonworks Sandbox environments to complete this tutorial:https://hortonworks.com/products/sandbox/
One can create the application logic for this tutorial by cloning my GitHub repository. Feel free to make contributions by forking the repo and submitting pull requests.
#Python Version git clone https://github.com/patalwell/SparkStreamingKafka.git #Java Version git clone https://github.com/patalwell/SparkStreamingSatori.git
Figure 1: The Satori Developer Sign Up Portal
Figure 2: Message Fields and JSON Payload
{ "exchange": "OKCoin", "cryptocurrency": "ETH", "basecurrency": "USD", "type": "market", "price": "779.09", "size": null, "bid": "773.64", "ask": "779.7", "open": null, "high": "779.69", "low": "741", "volume": "85.46", "timestamp": "1525898277327" }
Figure 3: The Python Client SDK for Satori
#!/usr/bin/env python from __future__ import print_function import sys import time from satori.rtm.client import make_client, SubscriptionMode endpoint = "wss://open-data.api.satori.com" appkey = undefined channel = "cryptocurrency-market-data" def main(): with make_client(endpoint=endpoint, appkey=appkey) as client: print('Connected to Satori RTM!') class SubscriptionObserver(object): def on_subscription_data(self, data): for message in data['messages']: print("Got message:", message) subscription_observer = SubscriptionObserver() client.subscribe( channel, SubscriptionMode.SIMPLE, subscription_observer) try: while True: time.sleep(1) except KeyboardInterrupt: pass if __name__ == '__main__': main()
git clone https://github.com/patalwell/SparkStreamingKafka.git cd SparkStreamingKafka/nifi
#Make a custom nars directory and change user/group ownership to nifi sudo mkdir /usr/hdf/current/nifi/nars-custom/ sudo chown nifi /usr/hdf/current/nifi/nars-custom/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/ #Make a lib directory and change user/group ownership to nifi sudo mkdir /usr/hdf/current/nifi/nars-custom/lib1/ sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/ #copy our custom nar into our new diretory sudo cp ~/SparkStreamingKafka/nifi/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar /usr/hdf/current/nifi/nars-custom/lib1/. sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar #validate our permissions ls -la /usr/hdf/current/nifi/nars-custom/lib1/
Figure 1: Updating the custom Nifi directory for our custom Satori processor
Figure 2: Adding a property to custom nifi.properties.xml site file in the Nifi configuration panel of Ambari
Figure 3: Restarting the Nifi client
Figure 1: Creating a processor group
Figure 2: Navigating to create a processor group, uploading our template, and dragging our template to the canvas
Figure 3: Uploading our satori-jolt-kafka.xml template
Figure 4: Our Satori-Jolt-Kafka Flow
Figure 5: Adding our Satori App Key and validating our other client SDK constructor arguments
Figure 6: The Chain Jolt Transformation. We are using a combination of specifications to remove fields, apply new fields, create values, and cast our fields from string to their respective types
#Jolt Specification for Jolt Transform JSON Processor [{ "operation": "remove", "spec": { "timestamp": "" } }, { "operation": "default", "spec": { "timestamp": "${now():toNumber()}" } }, { "operation": "modify-overwrite-beta", "spec": { "exchange": "=toString", "cryptocurrency": "=toString", "type": "=toString", "price": ["=toDouble", 0.0], "size": ["=toDouble", 0.0], "bid": ["=toDouble", 0.0], "ask": ["=toDouble", 0.0], "open": ["=toDouble", 0.0], "high": ["=toDouble", 0.0], "low": ["=toDouble", 0.0], "volume": ["=toDouble", 0.0], "timestamp": "=toLong" } }]
//Raw Cryptocurrency transaction payload { "exchange" : "OKCoin", "cryptocurrency" : "BTC", "basecurrency" : "USD", "type" : "market", "price" : "9108.77", "size" : null, "bid" : "9139.21", "ask" : "9207.94", "open" : null, "high" : "9519.99", "low" : "9026.95", "volume" : "119.54", "timestamp" : "1525197075190" }
Figure 7: The Jolt Transformation DSL and Specification Editor
Figure 8: Configuring our KafkaProducer Processor
Figure 9: Kafka Broker Configurations and Hostnames
~/SparkStreamingKafka/DStreamCyrptoStream.py
Python Imports and Class Instantiations:
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql import * from pyspark.sql.types import * """ Note: When running this app with spark-submit you need the following spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 {appName.py} """ MASTER = "local[*]" APP_NAME = "DStreamCryptoStream" KAFKA_BROKER = "pathdf3.field.hortonworks.com:6667" KAFKA_TOPIC = ['cryptocurrency-nifi-data'] BATCH_INTERVAL = 10 OFFSET = "earliest" #constructor for sparkContext; enables spark core sc = SparkContext(MASTER, APP_NAME) # constructor accepts SparkContext and Duration in Seconds # e.g. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, # Durations.seconds(10)); ssc = StreamingContext(sc, BATCH_INTERVAL) # Instantiate our DirectStream with the KafkaUtils class and subsequent method # Parameters include StreamingContext, Topics, KafkaParameters # To set the stream to capture the earliest data use: kafkaStream = KafkaUtils.createDirectStream(ssc=ssc,topics=KAFKA_TOPIC ,kafkaParams={"metadata.broker.list":KAFKA_BROKER, "startingOffsets":OFFSET})
# The data from Kafka is returned as a tuple (Key, Value). So we'll want to # transform the DStream with a mappper and point to the value portion of our tuple value = kafkaStream.map(lambda line: line[1]) #value is now a Dstream object print value # print type(value) # <class 'pyspark.streaming.kafka.KafkaTransformedDStream'>
# Lazily instantiated global instance of SparkSession (This is a hack to grab # sql context, we need to create a SparkSession using the SparkContext that the # Streaming conext is using. The method enables this during restart of the # driver. def getSparkSessionInstance(sparkConf): if ("sparkSessionSingletonInstance" not in globals()): globals()["sparkSessionSingletonInstance"] = SparkSession \ .builder \ .config(conf=sparkConf)\ .getOrCreate() return globals()["sparkSessionSingletonInstance"] # DataFrame operations inside your streaming program def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) schema = StructType([ StructField('exchange', StringType()) , StructField('cryptocurrency', StringType()) , StructField('basecurrency', StringType()) , StructField('type', StringType()) , StructField('price', DoubleType()) , StructField('size', DoubleType()) , StructField('bid', DoubleType()) , StructField('ask', DoubleType()) , StructField('open', DoubleType()) , StructField('high', DoubleType()) , StructField('low', DoubleType()) , StructField('volume', DoubleType()) , StructField('timestamp', LongType()) ]) # Convert RDD[String] to JSON DataFrame by casting the schema data = spark.read.json(rdd, schema=schema) # data.show() # drop null values from our aggregations df = data.na.drop() # Check the explicitly mapped schema # df.printSchema() #Create a tempView so edits can be made in SQL df.createOrReplaceTempView("CryptoCurrency")
Note: This is by no means a statistically significant outcome. You'll need to use historical data to create a baseline for the mean, stddev and confidence interval. The population size should be significant enough to meet the demands of your needs. What is a Confidence Interval?
# Get avg, max, min, and stdev for BTC, ETH, LTC, and ALX # we need to normalize our standard deviation by dividing by our # price average in order to calculate a per transaction # normalized_stnd_dev closer to 0 is less volatile print "**** Running Statistics of CryptoCurrency ****" spark.sql(""" SELECT cryptocurrency ,avg(price) - std(price) as lower_1_std_bound ,avg(price) as average_price ,avg(price) + std(price) as upper_1_std_bound ,max(price) as max_price ,min(price) as min_price ,std(price) as 1_std ,std(price) * 2 as 2_std ,std(price)/avg(price)*100 as normalized_stnd_dev FROM CryptoCurrency WHERE (cryptocurrency =='ADX' OR cryptocurrency == 'BTC' OR cryptocurrency == 'LTC' OR cryptocurrency == 'ETH') AND basecurrency == 'USD' GROUP BY cryptocurrency ORDER BY cryptocurrency""").show() except: pass # conduct an operation on our DStreams object; we are # inserting our def process function here and applying said function to each # RDD generated in the stream value.foreachRDD(process) # start our computations and stop when the user has issued a keyboard command ssc.start() ssc.awaitTermination()
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py
/usr/hdp/current/spark-client/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py
Figure 1: Spark-Submit –version to validate Spark Home
Figure 2: Starting our Nifi Flow
Figure 3: Results of our Streaming Application
Nice! You’ve successfully used Spark Streaming to micro batch cryptocurrency transactions to the console. You can also consult the documentation to learn more about the classes and methods that might be more appropriate for your use case. For a full listing of features, classes, and methods please review the Spark Streaming documentation or post questions to the Hortonworks Community Connection. I also plan on working with colleagues to implement some modeling on our data to calculate Value at Risk (VaR), create a dashboard with Superset, and persist our data to HBase for historical batch analysis via Pheonix. As always, comments and suggestions for other tutorials are highly encouraged.