Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Contributor

Short Description:

A quick tutorial on how to query cryptocurrency transactions with Spark Streaming with Python.

Abstract:

This tutorial covers how one can use Nifi to stream public cryptocurrency transactional data to Kafka and consequently query the transactional stream with Spark Streaming. We will be using the Pyspark API for demonstration purposes, however, I've also included a Java version for reference. It is strongly recommended that one use Java or Scala for production-grade applications; as most of the generally available (GA) features in Spark are published to support those languages foremost. I will provide links to both versions. Figures are used as graphical aids. Steps are highlighted by bullet points.

What is Spark? Why is it well suited for Streaming at scale?

Apache Spark is a distributed processing engine for data science and data engineering at scale. It features several out of the box libraries for machine learning, map reduce, graph analysis, micro batching, and structured query language. It is well suited to poll streams of data due to its language-integrated application programming interface (API) and fault tolerant behavior. Users can write stream jobs the same way they would batch and recover both lost work and operator state without extra code. The Spark team has conveniently encapsulated these concerns into Spark’s core API.

With Spark Streaming, one can ingest data from multiple sources (Kafka, Kinesis, Flume, and TCP sockets) and process data with abstracted functions like map, reduce, join, and window. Results can subsequently be pushed to HDFS, databases, and dashboards for meaningful insights.

Simply put, Spark Streaming provides a very convenient way for Hadoop developers to grab data in real-time and persist it to their system of choice.

What is micro batching?

“Micro batching is defined as the procedure in which the incoming stream of messages is processed by dividing them into group of small batches. This helps to achieve the performance benefits of batch processing; however, at the same time, it helps to keep the latency of processing of each message minimal”. –Apache Spark 2.x for Java Developers , Sumit Kumar & Sourav Gulati, O’Reilly Media Inc.

In Spark Streaming a micro-batch is a sequence of resilient distributed datasets (RDDs) packaged into an abstraction known as a Discretized Streams (Streams). Users can batch DStreams on time intervals, with batches ranging between 1-10 seconds. Determining the batch size really depends on the needs of the downstream system consumer and overall applicability of how fast a consumer can write the batches to disk or present information to a web browser.

Where are we getting our data?

We will be subscribing to a Satori public data channel for global cryptocurrency transactions. According to Satori,

Get streaming data for cryptocurrency market prices as well as exchange rates between alternative cryptocurrencies. Data is streamed from exchanges around the globe, where crypto-coins are traded in the US Dollar, Canadian Dollar, British Pound, Japanese Yen, Russian Ruble, etc.” -https://www.satori.com/livedata/channels/cryptocurrency-market-data

Note:

This tutorial assumes the user has successfully installed Spark2 on HDP and Nifi and Kafka on HDF. It also assumes the user has some familiarity with Nifi and Kafka, but it isn’t required to successfully complete the tutorial. For detailed instructions on how to accomplish these steps please consult our HDP and HDF user documentation:

HDP User Guide-Apache Spark

HDF Installation Guide

HDF User Guide-Apache Kafka

One can also use the Hortonworks Sandbox environments to complete this tutorial:https://hortonworks.com/products/sandbox/

Tools:

One can create the application logic for this tutorial by cloning my GitHub repository. Feel free to make contributions by forking the repo and submitting pull requests.

#Python Version
git clone https://github.com/patalwell/SparkStreamingKafka.git

#Java Version
git clone https://github.com/patalwell/SparkStreamingSatori.git

Tutorial:

Step 1: Setting up a Satori Account and Understanding the Client Connection Class and Methods

  • In order to create our Nifi flow, we’ll need to visit Satori and create a developer account. You can follow the directions here: https://developer.satori.com/#/signup Save your application key, the endpoint, and the channel name. We’ll be inserting these into our custom Satori Nifi processor. Big thanks to Laurence Da Luz for creating the Satori processor! See Figure 1

Figure 1: The Satori Developer Sign Up Portal

  • Once we’ve created a satori account, a new project, and app keys we can view the live data channels for cryptocurrency to get a taste of how we’ll be making a client connection to the channel. You’ll notice they have message fields( this will serve as our schema) and several client SDKs for C, C#, Go, JS, Java, and Python. See Figure 2 and 3.

Figure 2: Message Fields and JSON Payload

{ 
 "exchange": "OKCoin",
 "cryptocurrency": "ETH",
 "basecurrency": "USD",
 "type": "market",
 "price": "779.09",
 "size": null,
 "bid": "773.64",
 "ask": "779.7",
 "open": null,
 "high": "779.69",
 "low": "741",
 "volume": "85.46",
 "timestamp": "1525898277327"
}

Figure 3: The Python Client SDK for Satori

#!/usr/bin/env python

from __future__ import print_function

import sys
import time

from satori.rtm.client import make_client, SubscriptionMode

endpoint = "wss://open-data.api.satori.com"
appkey = undefined
channel = "cryptocurrency-market-data"

def main():
    with make_client(endpoint=endpoint, appkey=appkey) as client:
        print('Connected to Satori RTM!')

        class SubscriptionObserver(object):
            def on_subscription_data(self, data):
                for message in data['messages']:
                    print("Got message:", message)

        subscription_observer = SubscriptionObserver()
        client.subscribe(
            channel,
            SubscriptionMode.SIMPLE,
            subscription_observer)

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            pass


if __name__ == '__main__':
    main()
  • As you can see, we're importing the necessary Python modules, defining our connection macros, and instantiating a constructor to subscribe to the Satori client. For the sake of the tutorial, I've decided to abstract these object-oriented necessities from the user with the custom Satori processor. We'll still be making a connection to Satori, but it will be encapsulated within that processor.

Step 2: Copy our Custom Processor to Nifi

  • Next, we’ll clone my repository into the home directory of the nodes the Nifi client is installed on. You’ll find the satori-jolt-kafka-spark.xml template for our Nifi flow and the nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar for our Satori processor in the /nifi directory of the SparkStreamingKafka parent directory.
git clone https://github.com/patalwell/SparkStreamingKafka.git
cd SparkStreamingKafka/nifi
  • We’ll need to create a custom directory for our custom processor in order to prevent conflicts with custom nars during a Nifi upgrade. As such, we’ll add a separate directory for our custom processor, change the user and group ownership to Nifi, copy in our custom Satori processor nar, and add a line entry to nifi.properties within Ambari Figures 1, 2 and 3. Be sure to restart Nifi after you've finished these steps. It will prompt you to do this after you've updated the site.xml.
#Make a custom nars directory and change user/group ownership to nifi
sudo mkdir /usr/hdf/current/nifi/nars-custom/
sudo chown nifi /usr/hdf/current/nifi/nars-custom/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/

#Make a lib directory and change user/group ownership to nifi
sudo mkdir /usr/hdf/current/nifi/nars-custom/lib1/
sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/ | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/

#copy our custom nar into our new diretory
sudo cp ~/SparkStreamingKafka/nifi/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar /usr/hdf/current/nifi/nars-custom/lib1/.
sudo chown nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar | sudo chgrp nifi /usr/hdf/current/nifi/nars-custom/lib1/nifi-satori-bundle-nar-1.2.0.3.0.2.0-76.nar

#validate our permissions
ls -la  /usr/hdf/current/nifi/nars-custom/lib1/
   

Figure 1: Updating the custom Nifi directory for our custom Satori processor

Figure 2: Adding a property to custom nifi.properties.xml site file in the Nifi configuration panel of Ambari

Figure 3: Restarting the Nifi client

Step 3: Setup the Nifi Flow to Subscribe to our Satori Channel and Publish our Topic to Kafka

  • Now that we’ve gotten administrative work out of the way we can simply add our flow template to the Nifi canvas, create a processor group, and define a few macros within our processors. Navigate to the Nifi UI from Ambari by selecting the Nifi service, quick links and your host of choice.
  • Let's create a processor group Figure 1, double-click into the processor group, and import the satori-jolt-kafka.xml file into Nifi. I suggest downloading this file locally from my GitHub repository to make it easier to upload to the user interface. You can upload a template to Nifi by navigating to the Nifi flow process group icon on the left of the UI. It’s the box directly under navigate labeled operate. You’ll want to click the smaller icon that is first from the right. It should have an arrow pointing to a mock flow. See Figures 1, 2 and 3

Figure 1: Creating a processor group

Figure 2: Navigating to create a processor group, uploading our template, and dragging our template to the canvas

Figure 3: Uploading our satori-jolt-kafka.xml template

  • Once the template has been uploaded you can navigate to the top toolbar and select the second icon from the right. It resembles a group of processors. Drag that onto the console and select the Satori-Jolt-Kafka flow Figure 2 and 4. Your screen should now resemble Figure 4

Figure 4: Our Satori-Jolt-Kafka Flow

  • Now that we’ve successfully uploaded our template we can edit our processors and take a look at what’s going on. Right-click on the ConsumeSatori processor, select configure, navigate to the properties tab and add the application key we made when we signed up for a Satori developer account. You’ll notice the value for our App Key field has now changed to reflect a sensitive one with “sensitive value set”. The endpoint argument should resemble wss://open-data.api.satori.com and our channel should be cryptocurrency-market-data. Go ahead and click apply. The Processor should no longer have a warning emblem. Figure 5

Figure 5: Adding our Satori App Key and validating our other client SDK constructor arguments

  • The Satori processor is making the connection to the Satori API without having to write a bunch of code and its emitting JSON records as a string. This will be problematic later down the line, *palm to forehead for anyone that's dabbed with JSON, so I’ve opted to explicitly cast dataTypes() with a JSON Jolt processor.
  • Right-click on the Jolt Processor and select Configure. When the configuration menu comes up select the properties tab. For our Jolt Transformation DSL Value select Chain Figure 6. You can learn more about Jolt and how it can be used to manipulate JSON here: Jolt JSON Transformation Editor

Figure 6: The Chain Jolt Transformation. We are using a combination of specifications to remove fields, apply new fields, create values, and cast our fields from string to their respective types

  • Click the advanced tab on the lower left of the processor. This will populate an editor so we can test our Jolt specification logic. I've included the Jolt Specification below so we can see what's going on step by step. I've also included the raw payload so we can see what the transformations have accomplished for us.
#Jolt Specification for Jolt Transform JSON Processor
[{
    "operation": "remove",
    "spec": {
      "timestamp": ""
    }
},
  {
    "operation": "default",
    "spec": {
      "timestamp": "${now():toNumber()}"
    }
},
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "exchange": "=toString",
      "cryptocurrency": "=toString",
      "type": "=toString",
      "price": ["=toDouble", 0.0],
      "size": ["=toDouble", 0.0],
      "bid": ["=toDouble", 0.0],
      "ask": ["=toDouble", 0.0],
      "open": ["=toDouble", 0.0],
      "high": ["=toDouble", 0.0],
      "low": ["=toDouble", 0.0],
      "volume": ["=toDouble", 0.0],
      "timestamp": "=toLong"
    }
}]
  • The first operation is very intuitive. We are simply removing the timestamp field. I've opted to do this since some of the raw data contains "nulls". You could keep the data in its raw form by eliminating this step, but I've elected to make the processor easier to understand by adding features a user might want to take advantage of.
  • The second operation allows us to access the Nifi expression language to take a current timestamp and cast it as a number. See the Nifi expression language guide for more advanced topics and other methods: Apache Nifi Expression Language Guide
  • Our final operation, modify-overwrite-beta, is casting the fields to their respective types. If the fields are null they are filled with the second parameter. In this case, I've elected to fill the values with 0.0. The SparkStreaming API allows us to drop Null values, but again I've elected to showcase some of the features within Jolt for the sake of this tutorial. Note: Casting nulls as 0.0 might not be the best option for statistical aggregations as it will manipulate averages
  • Copy and paste the raw cryptocurrency transaction JSON into the JSON input field and click transform Figure 7
//Raw Cryptocurrency transaction payload
{
  "exchange" : "OKCoin",
  "cryptocurrency" : "BTC",
  "basecurrency" : "USD",
  "type" : "market",
  "price" : "9108.77",
  "size" : null,
  "bid" : "9139.21",
  "ask" : "9207.94",
  "open" : null,
  "high" : "9519.99",
  "low" : "9026.95",
  "volume" : "119.54",
  "timestamp" : "1525197075190"
}

Figure 7: The Jolt Transformation DSL and Specification Editor

  • Finally, let’s review our publish Kafka processor and set the relevant macro values we’ll need. Right-click on the PublishKafka processor and select Configure. Fill in the Kafka Broker value with the address to your Kafka broker; typically starting with the hostname Kafka is installed on and ending with port 6667 e.g. pathdf3.field.hortonworks.com:6667. Set the topic name to cryptocurrency-nifi-data and delivery guarantee to best effort. Click apply. Figure 8
  • If you don't know your Kafka broker address. Navigate to Ambari and select Kafka on your list of active services and select the configs tab. You'll see the broker hosts as the first line under the Kafka Broker field. Figure 9

Figure 8: Configuring our KafkaProducer Processor

Figure 9: Kafka Broker Configurations and Hostnames

Step 4: Review the Spark Streaming Application Logic

  • Now that we’ve successfully subscribed to a public data feed, cast our payload with the appropriate data types, and published our data to Kafka we can review the Spark Streaming application logic. This is located in our repository as:
~/SparkStreamingKafka/DStreamCyrptoStream.py
  • Starting from the top, let's import the necessary modules from Pyspark and define several parameter variables to instantiate a constructor for our SparkContext class object. Then let's create a SparkStreaming class object with the latter object and batch interval of 10 as constructor arguments. Finally, we'll want to create a kafkaStream (a DStream class object) with the KafkaUtils class and createDirectStream() method. Let's define our KafkaStream with several arguments e.g. Kafka broker, topic, and starting offsets.

Python Imports and Class Instantiations:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import *
from pyspark.sql.types import *

""" Note: When running this app with spark-submit you need the following
spark-submit --packages
org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0
{appName.py}
"""

MASTER = "local[*]"
APP_NAME = "DStreamCryptoStream"
KAFKA_BROKER = "pathdf3.field.hortonworks.com:6667"
KAFKA_TOPIC = ['cryptocurrency-nifi-data']
BATCH_INTERVAL = 10
OFFSET = "earliest"

#constructor for sparkContext; enables spark core
sc = SparkContext(MASTER, APP_NAME)

# constructor accepts SparkContext and Duration in Seconds
# e.g. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
# Durations.seconds(10));
ssc = StreamingContext(sc, BATCH_INTERVAL)

# Instantiate our DirectStream with the KafkaUtils class and subsequent method
# Parameters include StreamingContext, Topics, KafkaParameters
# To set the stream to capture the earliest data use:
kafkaStream = KafkaUtils.createDirectStream(ssc=ssc,topics=KAFKA_TOPIC
            ,kafkaParams={"metadata.broker.list":KAFKA_BROKER,
                          "startingOffsets":OFFSET})
  • Next, we'll want to create yet another object value, and filter our stream for values. The kafkaStream DStreams class object we created returns (key, value) pairs from Kafka; but we only care about the physical payload; e.g. our JSON, ergo the lambda function to filter our DStream.
# The data from Kafka is returned as a tuple (Key, Value). So we'll want to
# transform the DStream with a mappper and point to the value portion of our tuple
value = kafkaStream.map(lambda line: line[1])

#value is now a Dstream object
print value

# print type(value)
# <class 'pyspark.streaming.kafka.KafkaTransformedDStream'>
  • Now, we need to create a SparkSession by using the SparkContext we already created and define a function to query our JSON. To create a data frame from our raw JSON, I've elected to create a schema class object with Pyspark's StructType class. This is mapping our fields to their respective data types. I'm also subsequently using the data frame API to build a table, filter out null values, and create a temporary table "CryptoCurrency" to enable querying on our stream.
# Lazily instantiated global instance of SparkSession (This is a hack to grab
# sql context, we need to create a SparkSession using the SparkContext that the
# Streaming conext is using. The method enables this during restart of the
# driver.
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

# DataFrame operations inside your streaming program
def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        schema = StructType([
            StructField('exchange', StringType())
            , StructField('cryptocurrency', StringType())
            , StructField('basecurrency', StringType())
            , StructField('type', StringType())
            , StructField('price', DoubleType())
            , StructField('size', DoubleType())
            , StructField('bid', DoubleType())
            , StructField('ask', DoubleType())
            , StructField('open', DoubleType())
            , StructField('high', DoubleType())
            , StructField('low', DoubleType())
            , StructField('volume', DoubleType())
            , StructField('timestamp', LongType())
        ])

        # Convert RDD[String] to JSON DataFrame by casting the schema
        data = spark.read.json(rdd, schema=schema)
        # data.show()
        # drop null values from our aggregations
        df = data.na.drop()

        # Check the explicitly mapped schema
        # df.printSchema()

        #Create a tempView so edits can be made in SQL
        df.createOrReplaceTempView("CryptoCurrency")
  • At this point, you can write SQL against the stream. I've further elected to select the cryptocurrency, average price, max price, min price, standard deviation. Additionally, I'm adding a few functions to normalize the distributions and create upper and lower bounds for our standard deviations. I'm also filtering on US Dollars, Bitcoin, Algebraix Coin, LiteCoin, and Ethereum. Feel free to alter this query how you see fit.

Note: This is by no means a statistically significant outcome. You'll need to use historical data to create a baseline for the mean, stddev and confidence interval. The population size should be significant enough to meet the demands of your needs. What is a Confidence Interval?

  • Finally, we'll need to apply the function to our filtered Kafka DStream class object value by using one of the several methods that come with the class. In this case, we're using forEachRDD(). Let's start our application with start() and elect to await keyboard termination with awaitTermination().
        # Get avg, max, min, and stdev for BTC, ETH, LTC, and ALX
        # we need to normalize our standard deviation by dividing by our
        # price average in order to calculate a per transaction
        # normalized_stnd_dev closer to 0 is less volatile
        print "**** Running Statistics of CryptoCurrency ****"

        spark.sql("""
        SELECT cryptocurrency
        ,avg(price) - std(price) as lower_1_std_bound
        ,avg(price) as average_price
        ,avg(price) + std(price) as upper_1_std_bound
        ,max(price) as max_price
        ,min(price) as min_price
        ,std(price) as 1_std
        ,std(price) * 2 as 2_std
        ,std(price)/avg(price)*100 as normalized_stnd_dev
        FROM CryptoCurrency
        WHERE (cryptocurrency =='ADX'
        OR cryptocurrency == 'BTC'
        OR cryptocurrency == 'LTC'
        OR cryptocurrency == 'ETH')
        AND basecurrency == 'USD'
        GROUP BY cryptocurrency
        ORDER BY cryptocurrency""").show()

    except:
        pass

# conduct an operation on our DStreams object; we are
# inserting our def process function here and applying said function to each
# RDD generated in the stream
value.foreachRDD(process)

# start our computations and stop when the user has issued a keyboard command
ssc.start()
ssc.awaitTermination()

Step 5: Starting our Spark Streaming Application and Nifi flow

  • Assuming you have the spark-client configured to work from your home directory you can simply type the following command to run this application. We need the Kafka Assembly package in order to load the KafkaUtils class.
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py
  • You can test if SPARK_HOME is set by running spark-submit - -version you should see something like Figure 1. If Spark Home is not set you can still call the binaries with the following command: EDIT THIS
/usr/hdp/current/spark-client/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 ~/SparkStreamingKafka/DStreamCryptoStream.py

Figure 1: Spark-Submit –version to validate Spark Home

  • After we've started the application, navigate back to Nifi and start our flow. As you can see, this makes editing streaming applications a breeze. When the flow is started you should momentarily see data printed to the console. Figure 2 and 3

Figure 2: Starting our Nifi Flow

Figure 3: Results of our Streaming Application

Conclusion:

Nice! You’ve successfully used Spark Streaming to micro batch cryptocurrency transactions to the console. You can also consult the documentation to learn more about the classes and methods that might be more appropriate for your use case. For a full listing of features, classes, and methods please review the Spark Streaming documentation or post questions to the Hortonworks Community Connection. I also plan on working with colleagues to implement some modeling on our data to calculate Value at Risk (VaR), create a dashboard with Superset, and persist our data to HBase for historical batch analysis via Pheonix. As always, comments and suggestions for other tutorials are highly encouraged.

2,548 Views