Created on 09-26-2016 03:05 PM - edited 08-17-2019 09:36 AM
Controlling the environment of an application is vital for it's functionality and stability. Especially in a distributed environment it is important for developers to have control over the version of dependencies. In such an scenario it's a critical task to ensure possible conflicting requirements of multiple applications are not disturbing each other.
That is why frameworks like YARN ensure that each application is executed in a self-contained environment - typically in a Linux Container or Docker Container - that is controlled by the developer. In this post we show what this means for Python environments being used by Spark.
As mentioned earlier does YARN execute each application in a self-contained environment on each host. This ensures the execution in a controlled environment managed by individual developers. The way this works in a nutshell is that the dependency of an application are distributed to each node typically via HDFS.
The files are uploaded to a staging folder /user/${username}/.${application} of the submitting user in HDFS. Because of the distributed architecture of HDFS it is ensured that multiple nodes have local copies of the files. In fact to ensure that a large fraction of the cluster has a local copy of application files and does not need to download them over the network, the HDFS replication factor is set much higher for this files than 3. Often a number between 10 and 20 is chosen for the replication factor.
During the preparation of the container on a node you will notice in logs similar commands to the below example are being executed:
ln -sf "/hadoop/yarn/local/usercache/vagrant/filecache/72/pyspark.zip" "pyspark.zip"
The folder /hadoop/yarn/local/ is the configured location on each node where YARN stores it's needed files and logs locally. Creating a symbolic link like this inside the container makes the content of the zip file available. It is being referenced as "pyspark.zip".
For application developers this means that they can package and ship their controlled environment with each application. Other solutions like NFS or Amazon EFS shares are not needed, especially since solutions like shared folders makes for a bad architecture that is not designed to scale very well making the development of application less agile.
The following example demonstrate the use of conda env to transport a python environment with a PySpark application needed to be executed. This sample application uses the NLTK package with the additional requirement of making tokenizer and tagger resources available to the application as well.
Our sample application:
import os import sys from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf() conf.setAppName("spark-ntlk-env") sc = SparkContext(conf=conf) data = sc.textFile('hdfs:///user/vagrant/1970-Nixon.txt') def word_tokenize(x): import nltk return nltk.word_tokenize(x) def pos_tag(x): import nltk return nltk.pos_tag([x]) words = data.flatMap(word_tokenize) words.saveAsTextFile('hdfs:///user/vagrant/nixon_tokens') pos_word = words.map(pos_tag) pos_word.saveAsTextFile('hdfs:///user/vagrant/nixon_token_pos')
For our example we are using the provided samples of NLTK (http://www.nltk.org/nltk_data/) and upload them to HDFS:
(nltk_env)$ python -m nltk.downloader -d nltk_data all (nltk_env)$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt /user/vagrant/
Before we actually go and create our environment lets first take a quick moment to recap on how an environment is typically being composed. On a machine the environment is made out of variables linking to different target folders containing executable or other resource files. So if you execute a command it is either referenced from your PATH, PYTHON_LIBRARY, or any other defined variable. These variables link to files in directories like /usr/bin, /usr/local/bin or any other referenced location. They are called hard links or absolute reference as they start from root /.
Environments using hard links are not easily transportable as they make strict assumption about the the overall execution engine (your OS for example) they are being used in. Therefor it is necessary to use relative links in a transportable/relocatable environment.
This is especially true for conda env as it creates hard links by default. By making the conda env relocatable it can be used in a application by referencing it from the application root . (current dir) instead of the overall root /. By using the --copy options during the creation of the environment packages are being copied instead of being linked.
Creating our relocatable environment together with nltk and numpy:
conda create -n nltk_env --copy -y -q python=3 nltk numpy Fetching package metadata ....... Solving package specifications: .......... Package plan for installation in environment /home/datalab_user01/anaconda2/envs/nltk_env: The following packages will be downloaded: package | build ---------------------------|----------------- python-3.5.2 | 0 17.2 MB nltk-3.2.1 | py35_0 1.8 MB numpy-1.11.1 | py35_0 6.1 MB setuptools-23.0.0 | py35_0 460 KB wheel-0.29.0 | py35_0 82 KB pip-8.1.2 | py35_0 1.6 MB ------------------------------------------------------------ Total: 27.2 MB The following NEW packages will be INSTALLED: mkl: 11.3.3-0 (copy) nltk: 3.2.1-py35_0 (copy) numpy: 1.11.1-py35_0 (copy) openssl: 1.0.2h-1 (copy) pip: 8.1.2-py35_0 (copy) python: 3.5.2-0 (copy) readline: 6.2-2 (copy) setuptools: 23.0.0-py35_0 (copy) sqlite: 3.13.0-0 (copy) tk: 8.5.18-0 (copy) wheel: 0.29.0-py35_0 (copy) xz: 5.2.2-0 (copy) zlib: 1.2.8-3 (copy) # # To activate this environment, use: # $ source activate nltk_env # # To deactivate this environment, use: # $ source deactivate #
This also works for different python version 3.x or 2.x!
Now that we have our relocatable environment all set we are able to package it and ship it as part of our sample PySpark job.
$ cd ~/anaconda2/envs/ $ zip -r nltk_env.zip nltk_env
Making this available in during the execution of our application in a YARN container we have for one distribute the package and for second change the default environment of Spark for python to your location.
The variable controlling the python environment for python applications in Spark is named PYSPARK_PYTHON.
PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python --master yarn-cluster --archives nltk_env.zip#NLTK spark_nltk_sample.py
Our virtual environment is linked by NLTK that is why the path in PYSPARK_PYTHON is pointing to ./NLTK/content/of/zip/... . The exact command being executed during container creation is something like this:
ln -sf "/hadoop/yarn/local/usercache/vagrant/filecache/71/nltk_env.zip" "NLTK"
Shipping additional resources with an application is controlled by the --files and --archives options as shown here.
The options being used here are documented in Spark Yarn Configuration and Spark Environment Variables for reference.
Doing just the above will unfortunately fail, because using the NLTK parser in the way we are using it in the example program has some additional dependencies. If you have followed the above steps executing submitting it to your YARN cluster will result in the following exception at container level:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): ... raise LookupError(resource_not_found) LookupError: ********************************************************************** Resource 'taggers/averaged_perceptron_tagger/averaged_perceptron _tagger.pickle' not found. Please use the NLTK Downloader to obtain the resource: >>> nltk.download() Searched in: - '/home/nltk_data' - '/usr/share/nltk_data' - '/usr/local/share/nltk_data' - '/usr/lib/nltk_data' - '/usr/local/lib/nltk_data' ********************************************************************** at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
The problem is that NLTK expects the follwoing resource in the tokenizers/punkt/english.pickle to be available in either of the following locations:
Searched in: - '/home/nltk_data' - '/usr/share/nltk_data' - '/usr/local/share/nltk_data' - '/usr/lib/nltk_data' - '/usr/local/lib/nltk_data'
The good thing about this is, that we by now should now how we can ship the required dependency to our application. We can do it the same way we did with our python environment.
Again it is imporant to reensure yourself how the resource is going to be referenced. NLTK expects it by default in the current location under tokenizers/punkt/english.pickle that is why we navigate into the folder for packaging and reference the zip file wiht tokenizer.zip#tokenizer.
(nltk_env)$ cd nltk_data/tokenizers/ (nltk_env)$ zip -r ../../tokenizers.zip * (nltk_env)$ cd ../../ (nltk_env)$ cd nltk_data/taggers/ (nltk_env)$ zip -r ../../taggers.zip * (nltk_env)$ cd ../../
At a later point our program will expect a tagger in the same fashion already demonstrated in the above snippet.
We can ship those zip resources the same way we shipped our conda env. In addition environment variable can be used to control resource discovery and allocation.
For NLTK you can use the environment variable NLTK_DATA to control the path. Setting this in Spark can be done similar to the way we set PYSPARK_PYTHON:
--conf spark.yarn.appMasterEnv.NLTK_DATA=./
Additionally YARN exposes the container path via the environment variable PWD. This can be used in NLTK as to add it to the search path as follows:
def word_tokenize(x): import nltk nltk.data.path.append(os.environ.get('PWD')) return nltk.word_tokenize(x)
The submission of your application becomes now:
PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python --conf spark.yarn.appMasterEnv.NLTK_DATA=./ --master yarn-cluster --archives nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers spark_nltk_sample.py
The expected result should be something like the following:
(nltk_env)$ hdfs dfs -cat /user/datalab_user01/nixon_tokens/* | head -n 20 Annual Message to the Congress on the State of the Union . January 22 , 1970 Mr. Speaker , Mr.
And:
(nltk_env)$ hdfs dfs -cat /user/datalab_user01/nixon_token_pos/* | head -n 20 [(u'Annual', 'JJ')] [(u'Message', 'NN')] [(u'to', 'TO')] [(u'the', 'DT')] [(u'Congress', 'NNP')] [(u'on', 'IN')] [(u'the', 'DT')] [(u'State', 'NNP')] [(u'of', 'IN')] [(u'the', 'DT')] [(u'Union', 'NN')] [(u'.', '.')] [(u'January', 'NNP')] [(u'22', 'CD')] [(u',', ',')] [(u'1970', 'CD')] [(u'Mr.', 'NNP')] [(u'Speaker', 'NN')] [(u',', ',')] [(u'Mr.', 'NNP')]
Post was first published here: http://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/
Created on 03-12-2018 06:13 PM
Hi, I've tried your article with a simpler example using HDP2.4.x. Instead of NLTK, I created a simple conda environment called jup (similar to https://www.anaconda.com/blog/developer-blog/conda-spark/)
When I try to run a variant of your spark submit command with NLTK, I get path ./ANACONDA/jup does not exist. Where did you define NLTK in your example
PYSPARK_PYTHON=./ANACONDA/jup/bin/python .....
I looked at the logs and it does not appear to be unzipping the zip file. I've added all the paths that I can get hold of as follows. Please note that if I drop .ANACONDA path and run spark locally then it works
PYSPARK_PYTHON=./ANACONDA/jup/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/jup/bin/python --conf spark.yarn.executorEnv.PYSPARK_PYTHON=./ANACONDA/jup/bin/python --conf spark.yarn.appMasterEnv.PYTHONPATH="/usr/hdp/current/spark-client/python:/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip" --conf spark.executorEnv.PYTHONPATH="/usr/hdp/current/spark-client/python/:/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip" --conf spark.yarn.appMasterEnv.PYTHONSTARTUP="/usr/hdp/current/spark-client/python/pyspark/shell.py" --conf spark.yarn.appMasterEnv.SPARK_HOME="/usr/hdp/current/spark-client" --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./ANACONDA/jup/bin/python --master yarn --archives /opt/app/anaconda3/envs/jup.zip#ANACONDA /home/d3849648/DSF/pysubmit2.py
I can only get it to run if I drop ANACONDA from the pyspark path but I still get the following error
The upload side
18/03/12 16:46:34 INFO Client: Using the spark assembly jar on HDFS because you are using HDP, defaultSparkAssembly:hdfs://HDP50/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar 18/03/12 16:46:34 INFO Client: Source and destination file systems are the same. Not copying hdfs://HDP50/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar 18/03/12 16:46:34 INFO Client: Uploading resource file:/opt/app/anaconda3/envs/jup.zip#ANACONDA -> hdfs://HDP50/user/d3849648/.sparkStaging/application_1520011290259_0032/jup.zip 18/03/12 16:46:39 INFO Client: Uploading resource file:/usr/hdp/2.4.2.0-258/spark/python/lib/pyspark.zip -> hdfs://HDP50/user/XX/.sparkStaging/application_1520011290259_0032/pyspark.zip 18/03/12 16:46:39 INFO Client: Uploading resource file:/usr/hdp/2.4.2.0-258/spark/python/lib/py4j-0.9-src.zip -> hdfs://HDP50/user/XX/.sparkStaging/application_1520011290259_0032/py4j-0.9-src.zip 18/03/12 16:46:39 INFO Client: Uploading resource file:/tmp/spark-3763c756-db41-4cd7-8cbe-7e48a1788e7d/__spark_conf__4305293861537090121.zip -> hdfs://HDP50/user/XX/.sparkStaging/application_1520011290259_0032/__spark_conf__4305293861537090121.zip
18/03/12 16:47:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hkg3pl0244.hk.hsbc): java.io.IOException: Cannot run program "jup/bin/python": error=2, No such file or directory
Created on 08-21-2018 10:47 AM
Hello thank you so much for this post it is awesome as we could ship any version of python interpreter to the hadoop cluster, even though the version of python is not installed.
However what is the catch? One thing I have found is the "package" with only python 3.4 + pandas is already 450MB zipped up. Would this be an issue?
Created on 02-11-2019 02:31 PM
Hi @hkropp I followed your post and when I trigger my spark-submit and the application is accepted by the yarn cluster I get the following error:
Do you know what might be the reason?
In STDOUT
./CONDA_TEST/test_env3/bin/python: ./CONDA_TEST/test_env3/bin/python: cannot execute binary file
In STDERR
19/02/11 10:48:43 INFO ApplicationMaster: Preparing Local resources 19/02/11 10:48:44 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1548016788605_1234_000002 19/02/11 10:48:44 INFO SecurityManager: Changing view acls to: mbilling 19/02/11 10:48:44 INFO SecurityManager: Changing modify acls to: mbilling 19/02/11 10:48:44 INFO SecurityManager: Changing view acls groups to: 19/02/11 10:48:44 INFO SecurityManager: Changing modify acls groups to: 19/02/11 10:48:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(mbilling); groups with view permissions: Set(); users with modify permissions: Set(mbilling); groups with modify permissions: Set() 19/02/11 10:48:44 INFO ApplicationMaster: Starting the user application in a separate Thread 19/02/11 10:48:44 INFO ApplicationMaster: Waiting for spark context initialization... 19/02/11 10:48:44 ERROR ApplicationMaster: User application exited with status 126 19/02/11 10:48:44 INFO ApplicationMaster: Final app status: FAILED, exitCode: 126, (reason: User application exited with status 126) 19/02/11 10:48:44 ERROR ApplicationMaster: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:428) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:281) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:783) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:781) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: org.apache.spark.SparkUserAppException: User application exited with 126 at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:104) at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:654) 19/02/11 10:48:44 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User application exited with status 126) 19/02/11 10:48:44 INFO ApplicationMaster: Deleting staging directory hdfs://pmids01/user/mbilling/.sparkStaging/application_1548016788605_1234 19/02/11 10:48:44 INFO ShutdownHookManager: Shutdown hook called
Im running like this
export SPARK_MAJOR_VERSION=2; \ export PYSPARK_PYTHON=./CONDA_TEST/test_env3/bin/python; \ export PYSPARK_DRIVER_PYTHON=./CONDA_TEST/test_env3/bin/python; \ export PYSPARK_DRIVER_PYTHON_OPTS=''; \ cd deploy;\ spark-submit --master yarn --deploy-mode cluster \ --verbose \ --num-executors 1 --driver-memory 1g --executor-memory 1g \ --files /usr/hdp/current/spark2-client/conf/hive-site.xml \ --jars /usr/hdp/current/hive-webhcat/share/hcatalog/hive-hcatalog-core.jar \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./CONDA_TEST/test_env3/bin/python \ --archives test_env3.zip#CONDA_TEST \ --py-files main.py main.py
Created on 03-18-2020 04:41 PM
Thanks for this great tutorial and I got your tutorial mostly working. However, the Python workers all failed with this following error message, not sure if because the cluster that I am working with is kerberozied but it somehow looks related to authentication and authorization.
["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: File "/data12/yarn/nm/usercache/yolo/appcache/application_1579645850066_329429/container_e40_1579645850066_329429_02_000002/PY_ENV/py36yarn/lib/python3.6/os.py", line 669, in __getitem__ raise KeyError(key) from None KeyError: 'PYTHON_WORKER_FACTORY_SECRET' 20/03/18 19:25:06 ERROR executor.Executor: Exception in task 2.2 in stage 0.0 (TID 4) org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:230) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) ... 11 more 20/03/18 19:25:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5 20/03/18 19:25:06 INFO executor.Executor: Running task 2.3 in stage 0.0 (TID 5)