Created on 05-26-2017 07:54 AM
For a simple PySpark application, you can use `--py-files` to specify its dependencies. A large PySpark application will have many dependencies, possibly including transitive dependencies. Sometimes a large application needs a Python package that has C code to compile before installation. And, there are times when you might want to run different versions of python for different applications. For such scenarios with large PySpark applications, `--py-files` is inconvenient.
Fortunately, in the Python world you can create a virtual environment as an isolated Python runtime environment. We recently enabled virtual environments for PySpark in distributed environments. This eases the transition from local environment to distributed environment with PySpark.
In this article, I will talk about how to use virtual environment in PySpark. (This feature is currently only supported in yarn mode.)
Note that pip is required to run virtualenv; for pip installation instructions, see https://pip.pypa.io/en/stable/installing/.
Now I will talk about how to set up a virtual environment in PySpark, using virtualenv and conda. There are two scenarios for using virtualenv in pyspark:
In HDP 2.6 we support batch mode, but this post also includes a preview of interactive mode.
For batch mode, I will follow the pattern of first developing the example in a local environment, and then moving it to a distributed environment, so that you can follow the same pattern for your development.
In this example we will use the following piece of code. This piece of code uses numpy in each map function. We save the code in a file named spark_virtualenv.py.
from pyspark import SparkContext
if __name__ == "__main__": sc = SparkContext(appName="Simple App") import numpy as np sc.parallelize(range(1,10)).map(lambda x : np.__version__).collect()
First we will create a virtual environment in the local environment. We highly recommend that you create an isolated virtual environment locally first, so that the move to a distributed virtualenv will be more smooth.
We use the following command to create and set up env_1 in the local environment:
virtualenv env_1 -p /usr/local/bin/python3 # create virtual environment env_1
Folder env_1 will be created under the current working directory. You should specify the python version, in case you have multiple versions installed.
Next, activate the virtualenv:
source env_1/bin/activate # activate virtualenv
After that you can run PySpark in local mode, where it will run under virtual environment env_1. You will see a "No module" error because numpy is not installed in this virtual environment. So, now let’s install numpy through pip:
pip install numpy # install numpy
After installing numpy, you can use numpy in PySpark apps launched by spark-submit in your local environment. Use the following command:
bin/spark-submit --master local spark_virtualenv.py
Now let’s move this into a distributed environment. There are two steps for moving from a local development to a distributed environment.
pip freeze > requirements.txt
Here’s sample output from the requirements file:
spark-submit --master yarn-client --conf spark.pyspark.virtualenv.enabled=true --conf spark.pyspark.virtualenv.type=native --conf spark.pyspark.virtualenv.requirements=/Users/jzhang/github/spark/requirements.txt --conf spark.pyspark.virtualenv.bin.path=/Users/jzhang/anaconda/bin/virtualenv --conf spark.pyspark.python=/usr/local/bin/python3 spark_virtualenv.py
You will see the following output, which shows that we have installed numpy on each executor successfully:
Next, I will talk about how to create a virtual environment using conda. The process is very similar to virtualenv, but uses different commands.
Here is the command to create virtual environment env_conda_1 with Python 2.7 in the local environment. Folder env_conda_1 will be created under the current working directory:
conda create --prefix env_conda_1 python=2.7
Use the following command to activate the virtual environment:.
source activate env_conda_1 // activate this virtual environment
Next, install numpy using the conda install command:
conda install numpy
Use the following command to create the requirements file. This command will put all of the installed Python package info into this file, so keep to stay in this virtual environment you created above.
conda list --export > requirements_conda.txt
Run the pyspark job in yarn-client mode:
bin/spark-submit --master yarn-client --conf spark.pyspark.virtualenv.enabled=true--conf spark.pyspark.virtualenv.type=conda--conf spark.pyspark.virtualenv.requirements=/Users/jzhang/github/spark/requirements_conda.txt --conf spark.pyspark.virtualenv.bin.path=/Users/jzhang/anaconda/bin/condaspark_virtualenv.py
You will see output similar to the following, which looks the same as the local example:
Interactive mode is not yet supported; the following information is a preview.
Interactive mode means that you don’t need to specify the requirements file when launching PySpark, and you can install packages in your virtualenv at runtime. Interactive mode is very useful for pyspark shell and notebook environments.
The following command launches the pyspark shell with virtualenv enabled. In the Spark driver and executor processes it will create an isolated virtual environment instead of using the default python version running on the host.
bin/pyspark --master yarn-client --conf spark.pyspark.virtualenv.enabled=true --conf spark.pyspark.virtualenv.type=native--conf spark.pyspark.virtualenv.bin.path=/Users/jzhang/anaconda/bin/virtualenv--conf spark.pyspark.python=/Users/jzhang/anaconda/bin/python
After you launch this pyspark shell, you will have a clean python runtime environment on both driver and executors. You can use sc.install_packages to install any python packages that could could be installed by pip; for example:
sc.install_packages(“numpy”) # install the latest numpy sc.install_packages(“numpy==1.11.0”) # install a specific version of numpy sc.install_packages([“numpy”, “pandas”]) # install multiple python packages
After that, you can use the packages that you just installed:
import numpy sc.range(4).map(lambda x: numpy.__version__).collect()
Interactive mode with conda is almost the same as with virtualenv. One exception is that you need to specify spark.pyspark.virtualenv.python_version, because conda needs to specify a python version to create the virtual environment.
bin/pyspark --master yarn-client --conf spark.pyspark.virtualenv.enabled=true--conf spark.pyspark.virtualenv.type=conda--conf spark.pyspark.virtualenv.bin.path=/Users/jzhang/anaconda/bin/conda--conf spark.pyspark.virtualenv.python_version=3.5
|spark.pyspark.virtualenv.enabled||Property flag to enable virtualenv|
|spark.pyspark.virtualenv.type||Type of virtualenv. Valid values are “native”, “conda”|
|spark.pyspark.virtualenv.requirements||Requirements file (optional, not required for interactive mode)|
|spark.pyspark.virtualenv.bin.path||The location of virtualenv executable file for type native or conda executable file for type conda|
|spark.pyspark.virtualenv.python_version||Python version for conda. (optional, only required when you use conda in interactive mode)|
For each executor, It takes some time to set up the virtualenv (installing the packages). The first time may be very slow.
For example, the first time I installed numpy on each node it took almost three minutes, because it needed to download files and compile them into wheel format. The next time it only took three seconds to install numpy, because it installed numpy from the cached wheel file.