Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Rising Star

Installing and Exploring Spark 2.0 with Jupyter Notebook and Anaconda Python in your laptop

  • 1-Objective
  • 2-Installing Anaconda Python
  • 3-Checking Python Install
  • 4-Installing Spark
  • 5-Checking Spark Install
  • 6-Launching Jupyter Notebook with PySpark 2.0.2
  • 7-Exploring PySpark 2.0.2
    • a.Spark Session
    • b.Read CSV
      • i.Spark 2.0 and Spark 1.6
      • ii.Pandas
    • c.Pandas DataFrames, Spark DataSets, DataFrames and RDDs
    • d.Machine Learning Pipeline
      • i.SciKit Learn
      • ii.Spark MLLib, ML
  • 8-Conclusion
  • 1-Objective
  • It is often useful to have python with the Jupyter notebook installed on your laptop in order to quickly develop and test some code ideas or to explore some data. Adding the ability to combine Apache Spark to this will also allow you to prototype ideas and exploratory data pipelines before hitting a Hadoop cluster and paying for Amazon Web Services.
  • We leverage the power of the Python ecosystem with libraries such as Numpy (scientific computing library of high-level mathematical functions to operate on arrays and matrices), SciPy (SciPy library depends on NumPy, which provides convenient and fast N-dimensional array manipulation), Pandas (high performance data structure and data analysis library to build complex data transformation flows), Scikit-Learn (library that implements a range of machine learning, preprocessing, cross-validation and visualization algorithms), NLTK (Natural Language Tool Kit to process text data, libraries for classification, tokenization, stemming, tagging, parsing, and semantic reasoning, wrappers for industrial-strength NLP libraries)…
  • We also leverage the strengths of Spark including Spark-SQL, Spark-MLLib or ML.
  • 2-Installing Anaconda Python
  • We install Continuum’s Anaconda distribution by downloading the install script from the Continuum website. https://www.continuum.io/downloads
  • The advantage of the Anaconda distribution is that lot of the essential python packages comes in bundled.
  • You do not have to struggle with all the dependencies synchronization.
  • We will use the following commands to download the install script. The command is to install Python version 3.5
HW12256:~ usr000$ wget http://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh
  • If you wish to install Python 2.7, the following download is recommended.
HW12256:~ usr000$ wget http://repo.continuum.io/archive/Anaconda2-4.2.0-Linux-x86_64.sh
  • Accordingly, in the terminal, issue the following bash command to launch the install.
  • Python 3.5 version
HW12256:~ usr000$ bash Anaconda3-4.2.0-Linux-x86_64.sh
  • Python 2.7 version
HW12256:~ usr000$ bash Anaconda2-4.2.0-Linux-x86_64.sh
  • In the following steps, we are using Python 3.5 as the base environment.
  • 3-Checking Python Install
  •  
  • In order to check the Python install, we issue the following commands in the terminal.
HW12256:~ usr000$ which python
/Users/usr000/anaconda/bin/python

HW12256:~ usr000$ echo $PATH
/Users/usr000/anaconda/bin:/usr/local/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin

HW12256:~ usr000$ python --version
Python 3.5.2 :: Anaconda 4.1.1 (x86_64)

HW12256:~ usr000$ python
Python 3.5.2 |Anaconda 4.1.1 (x86_64)|
(default, Jul  2 2016, 17:52:12)[GCC 4.2.1 Compatible Apple LLVM 4.2
(clang-425.0.28)] on darwinType "help",
"copyright", "credits" or "license" for more
information.

>>> import sys

>>> print("Python version: {}
".format(sys.version))Python version: 3.5.2 |Anaconda 4.1.1
(x86_64)| (default, Jul  2 2016,
17:52:12)[GCC 4.2.1 Compatible Apple LLVM 4.2
(clang-425.0.28)]

>>> from datetime import datetime

>>> print('current date and time:
{}'.format(datetime.now()))current date and time: 2016-12-29
09:46:32.393985

>>> print('current date and time:{}'.format(datetime.now().strftime('%Y-%m-%d 
%H:%M:%S')))current date and time: 2016-12-29  09:51:33

>>> exit()
  • Anaconda Python includes a package manager called ‘conda’ which can list and update the existing libraries available in the current system.
  • HW12256:~ usr000$ conda info
  • Current conda install:
  • platform : osx-64
  • conda version : 4.2.12
  • conda is private : False
  • conda-env version : 4.2.12
  • conda-build version : 0+unknown
  • python version : 3.5.2.final.0
  • requests version : 2.10.0
  • root environment : /Users/usr000/anaconda (writable)
  • default environment : /Users/usr000/anaconda
  • envs directories : /Users/usr000/anaconda/envs
  • package cache : /Users/usr000/anaconda/pkgs
  • channel URLs : https://repo.continuum.io/pkgs/free/osx-64
  • https://repo.continuum.io/pkgs/free/noarch
  • https://repo.continuum.io/pkgs/pro/osx-64
  • https://repo.continuum.io/pkgs/pro/noarch
  • config file : None
  • offline mode : False
  • HW12256:~ usr000$ conda list
  • 4-Installing Spark
  • To install Spark, we download the pre-built spark tarball spark-2.0.2-bin-hadoop2.7.tgz from http://spark.apache.org/downloads.html and move to your target Spark directory.
  • Untar the tarball in your chosen directory
HW12256:bin usr000$ tar -xvfz spark-2.0.2-bin-hadoop2.7.tgz
  • Create symlink to spark2 directory
HW12256:bin usr000$ ln -s ~/bin/sparks/spark-2.0.2-bin-hadoop2.7 ~/bin/spark2
  • 5-Checking Spark Install
  • Check the directories created under Spark 2
HW12256:bin usr000$ ls -lru
total 16drwxr-xr-x 
5 usr000  staff  170 Dec 28 10:39 sparkslrwxr-xr-x 
1 usr000  staff  50 Dec 28 10:39 spark2 -> /Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7lrwxr-xr-x 
1 usr000  staff  51 May 23 
2016 spark -> /Users/usr000/bin/sparks/spark-1.6.1-bin-hadoop2.6/HW12256:bin usr000$ cd spark2HW12256:spark2 usr000$ ls -lrutotal 112drwxr-xr-x@  3 usr000 
staff  102 Jan  1  1970
yarndrwxr-xr-x@ 
24 usr000  staff  816 Jan 
1  1970 sbindrwxr-xr-x@ 
10 usr000  staff  340 Dec 28 10:30 pythondrwxr-xr-x@ 
38 usr000  staff  1292 Jan 
1  1970 licensesdrwxr-xr-x@ 208 usr000  staff 
7072 Dec 28 10:30 jarsdrwxr-xr-x@  4 usr000 
staff  136 Jan  1  1970
examplesdrwxr-xr-x@  5 usr000 
staff  170 Jan  1  1970
datadrwxr-xr-x@  9 usr000 
staff  306 Dec 28 10:27 confdrwxr-xr-x@ 
24 usr000  staff  816 Dec 28 10:30 bin-rw-r--r--@  1 usr000 
staff  120 Dec 28 10:25 RELEASE-rw-r--r--@  1 usr000 
staff  3828 Dec 28 10:25
README.mddrwxr-xr-x@  3 usr000 
staff  102 Jan  1  1970
R-rw-r--r--@  1 usr000 
staff  24749 Dec 28 10:25 NOTICE-rw-r--r--@  1 usr000 
staff  17811 Dec 28 10:25 LICENSEHW12256:spark2 usr000$
  • Running SparkPi example in local mode.
  • Scala command
# export SPARK_HOMEHW12256:spark2 usr000$ export
SPARK_HOME=/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7HW12256:spark2 usr000$ echo $SPARK_HOME/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7# Run Spark PI example in ScalaHW12256:spark2 usr000$ ./bin/spark-submit
--class org.apache.spark.examples.SparkPi --driver-memory 512m
--executor-memory 512m --executor-cores 1
$SPARK_HOME/examples/jars/spark-examples*.jar 5Python commandHW12256:spark2 usr000$ ./bin/spark-submit --driver-memory
512m --executor-memory 512m --executor-cores 1 examples/src/main/python/pi.py
10Scala exampleHW12256:spark2 usr000$ export
SPARK_HOME=/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7HW12256:spark2 usr000$ echo $SPARK_HOME/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7HW12256:spark2 usr000$ ./bin/spark-submit
--class org.apache.spark.examples.SparkPi --driver-memory 512m
--executor-memory 512m --executor-cores 1
$SPARK_HOME/examples/jars/spark-examples*.jar 5Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties16/12/29 11:40:53 INFO SparkContext:
Running Spark version 2.0.216/12/29 11:40:53 WARN NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java
classes where applicable...16/12/29 11:40:55 INFO DAGScheduler: Job 0
finished: reduce at SparkPi.scala:38, took 0.851288 sPi is roughly 3.139094278188556216/12/29 11:40:55 INFO SparkUI: Stopped
Spark web UI at http://000.000.0.0:4040...16/12/29 11:40:55 INFO SparkContext:
Successfully stopped SparkContext16/12/29 11:40:55 INFO ShutdownHookManager:
Shutdown hook called16/12/29 11:40:55 INFO ShutdownHookManager:
Deleting directory
/private/var/folders/1r/8qylt4bj4h59b3h_1xq_nsw00000gp/T/spark-35b67f21-1d52-4dee-9c75-7e9d9c153adaHW12256:spark2 usr000$
  • Python example
HW12256:spark2 usr000$ ./bin/spark-submit examples/src/main/python/pi.py 10
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties16/12/29 11:27:33 INFO SparkContext:
Running Spark version 2.0.216/12/29 11:27:33 WARN NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java
classes where applicable...16/12/29 11:27:36 INFO TaskSchedulerImpl:
Removed TaskSet 0.0, whose tasks have all completed, from pool16/12/29 11:27:36 INFO DAGScheduler: Job 0
finished: reduce at /Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/examples/src/main/python/pi.py:43,
took 1.199257 sPi is roughly 3.13836016/12/29 11:27:36 INFO SparkUI: Stopped
Spark web UI at http://http://000.000.0.0:404016/12/29 11:27:36 INFO
MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!...16/12/29 11:27:36 INFO SparkContext:
Successfully stopped SparkContext16/12/29 11:27:37 INFO ShutdownHookManager:
Shutdown hook called16/12/29 11:27:37 INFO ShutdownHookManager:
Deleting directory
/private/var/folders/1r/8qylt4bj4h59b3h_1xq_nsw00000gp/T/spark-eb12faa9-b7ff-4556-9538-45ddcdc6797b16/12/29 11:27:37 INFO ShutdownHookManager:
Deleting directory /private/var/folders/1r/8qylt4bj4h59b3h_1xq_nsw00000gp/T/spark-eb12faa9-b7ff-4556-9538-45ddcdc6797b/pyspark-ba9947c5-dbea-4edc-9c4c-c2c316e6caba
  • Wordcount program using PySpark
HW12256:spark2 usr000$ ./bin/pyspark
Python 2.7.10 (default, Jul 30 2016,
19:40:32)[GCC 4.2.1 Compatible Apple LLVM 8.0.0
(clang-800.0.34)] on darwinType "help",
"copyright", "credits" or "license" for more
information.Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.propertiesSetting default log level to
"WARN".To adjust logging level use
sc.setLogLevel(newLevel).16/12/29 12:25:15 WARN NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java
classes where applicableWelcome to 
____  __ 
/ __/__  ___ _____/ /__ 
_\ \/ _ \/ _ `/ __/  '_/ 
/__ / .__/\_,_/_/ /_/\_\  version
2.0.2 
/_/Using Python version 2.7.10 (default, Jul
30 2016 19:40:32)SparkSession available as 'spark'.>>> import os>>> print(os.getcwd())/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7>>> import re>>> from operator import add>>> wordcounts_in =
sc.textFile('README.md').flatMap(lambda l: re.split('\W+', l.strip())).filter(lambda
w: len(w)>0).map(lambda w: (w,1)).reduceByKey(add).map(lambda (a,b):
(b,a)).sortByKey(ascending = False)/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
UserWarning: Please install psutil to have better support with spilling/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
UserWarning: Please install psutil to have better support with spilling>>> wordcounts_in.take(10)[(23, u'the'), (18, u'Spark'), (14, u'to'),
(13, u'run'), (11, u'for'), (11, u'apache'), (11, u'spark'), (11, u'and'), (11,
u'org'), (8, u'a')]>>> wordcounts_in =
sc.textFile('README.md').flatMap(lambda l: re.split('\W+',
l.strip())).filter(lambda w: len(w)>0).map(lambda w: (w,1)).reduceByKey(add).map(lambda
(a,b): (b,a)).sortByKey(ascending = False).map(lambda (a,b): (b,a))/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
UserWarning: Please install psutil to have better support with spilling/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
UserWarning: Please install psutil to have better support with spilling>>> wordcounts_in.take(10) 
 [(u'the',
23), (u'Spark', 18), (u'to', 14), (u'run', 13), (u'for', 11), (u'apache', 11),
(u'spark', 11), (u'and', 11), (u'org', 11), (u'a', 8)]>>>exit()
  • 6-Launching Jupyter Notebook with PySpark
  • Launching Jupyter Notebook with Spark 1.6.*, we use to associate the --packages com.databricks:spark-csv_2.11:1.4.0 parameter in the command as the csv package was not natively part of Spark.
HW12256:~ usr000$ PYSPARK_DRIVER_PYTHON=jupyter
PYSPARK_DRIVER_PYTHON_OPTS='notebook' PYSPARK_PYTHON=python3 /Users/usr000/bin/spark/bin/pyspark
--packages com.databricks:spark-csv_2.11:1.4.0
  • In the case of Spark 2.0.*, we do not need to associate the spark-csv –packages parameter, as spark-csv is part of the standard Spark 2.0 library.
HW12256:~ usr000$
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook'
PYSPARK_PYTHON=python3 /Users/usr000/bin/spark2/bin/pyspark
  • 7-Exploring PySpark 2.0.2
  • We will explore the new features of Spark 2.0.2 using PySpark and contrasting where appropriate with previous version of spark and with pandas. In the case of the machine learning pipeline, we will contract Spark MLLib or ML with Scikit Learn.
  •  
  • a.Spark Session
  • Spark 2.0 introduces SparkSession. SparkSession is the single entry point for interacting with Spark functionality. It replaces and encapsulates the SQLContext, HiveContext and StreamingContext for a more unified access to the DataFrame and Dataset APIs. The SQLContext, HiveContext and StreamingContext still exist under the hood in Spark 2.0 for continuity purpose with the Spark legacy code.
  • The Spark session has to be created when using spark-submit command. An example on how to do that:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
# from pyspark.sql import SQLContext

spark = SparkSession\  .builder\ 
.appName("example-spark")\ 
.config("spark.sql.crossJoin.enabled","true")\  
.getOrCreate()sc = SparkContext()

# sqlContext = SQLContext(sc)
  • When typing ‘pyspark’ at the terminal, python automatically creates the spark context sc.
  • A SparkSession is automatically generated and available as 'spark'.
  • Application name can be accessed using SparkContext.
spark.sparkContext.appName# Configuration is accessible
using RuntimeConfig:from py4j.protocol import
Py4JErrortry:   spark.conf.get("some.conf")except Py4JError as e:  pass
  • The following code outline the available spark context sc as well as the new spark session under the name "spark" which includes the previous sqlContext, HiveContext, StreamingContext under one unified single entry point.
  • sqlContext, HiveContext, StreamingContext still exist to ensure continuity with legacy code in Spark.
HW12256:spark2 usr000$ ./bin/pyspark
Python 2.7.10 (default, Jul 30 2016, 19:40:32)[GCC 4.2.1 Compatible Apple
LLVM 8.0.0 (clang-800.0.34)] on darwinType "help",
"copyright", "credits" or "license" for more
information.Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to
"WARN".To adjust logging level use
sc.setLogLevel(newLevel).16/12/29 20:41:27 WARN
NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicableWelcome
to   ____  __  / __/__ 
___ _____/ /__  _\ \/ _ \/ _ `/ __/  '_/  /__ / .__/\_,_/_/ /_/\_\  version 2.0.2  /_/Using Python version 2.7.10
(default, Jul 30 2016 19:40:32)SparkSession available as
'spark'.>>> sc<pyspark.context.SparkContext
object at 0x101e9c850>>>>
sc._conf.getAll()[(u'spark.app.id',
u'local-1483040488671'), (u'spark.sql.catalogImplementation', u'hive'),
(u'spark.rdd.compress', u'True'), (u'spark.serializer.objectStreamReset',
u'100'), (u'spark.master', u'local[*]'), (u'spark.executor.id', u'driver'),
(u'spark.submit.deployMode', u'client'), (u'hive.metastore.warehouse.dir',
u'file:/Users/usr000/bin/sparks/spark-2.0.2-bin-hadoop2.7/spark-warehouse'),
(u'spark.driver.port', u'57764'), (u'spark.app.name', u'PySparkShell'),
(u'spark.driver.host', u'000.000.0.0')]>>>
spark<pyspark.sql.session.SparkSession
object at 0x102df9b50>>>>
spark.sparkContext<pyspark.context.SparkContext
object at 0x101e9c850>>>>
spark.sparkContext.appNameu'PySparkShell'>>> from
pyspark.sql.functions import *>>> spark.range(1,
7, 2).collect()16/12/29 20:58:32 WARN
ObjectStore: Version information not found in metastore.
hive.metastore.schema.verification is not enabled so recording the schema
version 1.2.016/12/29 20:58:32 WARN
ObjectStore: Failed to get database default, returning NoSuchObjectException[Row(id=1), Row(id=3),
Row(id=5)]
  • b.Read CSV
  • We describe how to easily access csv files from spark and from pandas and load them into dataframe for data exploration, maniputation and mining.
  • i.Spark 2.0 & Spark 1.6
  • We can create a spark dataframe directly from reading the csv file.
  • In order to be compatible with previous format we have include a conditional switch in the format statement
## Spark 2.0 and Spark 1.6 compatible read csv#formatPackage = "csv" if sc.version > '1.6' else
"com.databricks.spark.csv"df = sqlContext.read.format(formatPackage).options(header='true',
delimiter = '|').load("s00_dat/dataframe_sample.csv")df.printSchema() 
  • ii.Pandas
  • We can create the iris pandas dataframe from the existing dataset from sklearn.
from
sklearn.datasets import load_irisimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltiris = load_iris()df = pd.DataFrame(iris.data, columns=iris.feature_names)df['species'] = pd.Categorical.from_codes(iris.target, iris.target_names)
  • c.Dataframes
  • i.Pandas DataFrames
  • Pandas dataframes in conjunction with visualization libraries such as matplotlib and seaborn give us some nice insights into the data
  • ii.Spark DataSets, Spark DataFrames and Spark RDDs
  • Spark Dataframe and Spark RDDs are the fundamental data structure that allow us to manipulate and interact with the various Spark libraries.
  • Spark DataSets are more relevant for Scala developpers and give the ability to create typed spark dataframe.
  • d.Machine Learning
  • i.SciKit Learn
  • We demonstrate a random forest machine learning pipeline using scikit learn in the ipython notebook.
  • ii.Spark MLLib, Spark ML
  • We demonstrate a random forest machine learning pipeline using Spark MLlib and Spark ML
  • 8-Conclusion
  • Spark and Jupyter Notebook using the Anaconda Python distribution provide a very powerful development environment in your laptop.
  • It allows quick exploration of data mining, machine learning, visualizations in a flexible and easy to use environment.
  • We have described the installation of Jupyter Notebook, Spark. We have described few data processing pipeline as well as a machine learning classification using Random Forest.
22,560 Views