Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar

There are situations, when one might want to submit a Spark job via a REST API:

  • If you want to submit Spark jobs from your IDE on our workstation outside the cluster
  • If the cluster can only be accessed via Knox (perimeter security)

One possibility is to use the Oozie REST API and the Oozie Spark action,

However, this article looks into the option of using the YARN REST API directly. Starting with the Cluster Applications API I tried to come up with an approach that resembles the spark-submit command.

1) Copy Spark assembly jar to HDFS

Per default the spark assembly jar file is not available in HDFS. For remote access we will need it.

Some standard locations in HDP are:

  • HDP 2.3.2:
    • Version: 2.3.2.0-2950
    • Spark Jar: /usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly-1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar
  • HDP 2.4.0:
    • Version: 2.4.0.0-169
    • Spark Jar: /usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar

This is a one time preparation step, for example for HDP 2.4 it would be:

sudo su - hdfs
HDP_VERSION=2.4.0.0-169
SPARK_JAR=spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar
hdfs dfs -mkdir "/hdp/apps/${HDP_VERSION}/spark/"
hdfs dfs -put "/usr/hdp/${HDP_VERSION}/spark/lib/$SPARK_JAR" "/hdp/apps/${HDP_VERSION}/spark/spark-hdp-assembly.jar"

2) Upload your spark application jar file to HDFS

Upload your spark application jar file packaged by sbt to the project folder in HDFS via WebHdfs (maybe use something better than "/tmp"):

export APP_FILE=simple-project_2.10-1.0.jar
curl    -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project?op=MKDIRS"
curl -i -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project/${APP_FILE}?op=CREATE&overwrite=true"
# take Location header from the response and issue a PUT request
LOCATION="http://..."
curl -i -X PUT -T "target/scala-2.10/${APP_FILE}" "${LOCATION}"

3) Create spark property file and upload to HDFS

spark.yarn.submit.file.replication=3
spark.yarn.executor.memoryOverhead=384
spark.yarn.driver.memoryOverhead=384
spark.master=yarn
spark.submit.deployMode=cluster
spark.eventLog.enabled=true
spark.yarn.scheduler.heartbeat.interval-ms=5000
spark.yarn.preserve.staging.files=true
spark.yarn.queue=default
spark.yarn.containerLauncherMaxThreads=25
spark.yarn.max.executor.failures=3
spark.executor.instances=2
spark.eventLog.dir=hdfs\:///spark-history
spark.history.kerberos.enabled=true
spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port=18080
spark.history.fs.logDirectory=hdfs\:///spark-history
spark.executor.memory=2G
spark.executor.cores=2
spark.history.kerberos.keytab=none
spark.history.kerberos.principal=none

and upload it via WebHDFS as spark-yarn.properties to your simple-project folder as before

4) Create a Spark Job json file

a) We need to construct the command to start the Spark ApplicationMaster:

java -server -Xmx1024m -Dhdp.version=2.4.0.0-169 \
     -Dspark.yarn.app.container.log.dir=/hadoop/yarn/log/rest-api \
     -Dspark.app.name=SimpleProject \
     org.apache.spark.deploy.yarn.ApplicationMaster \
     --class IrisApp --jar __app__.jar \
     --arg '--class' --arg 'SimpleProject' \
     1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr

It is important to provide the Spark Application Name and the HDP Version. Spark will resolve <LOG_DIR>

b) We need to set some general environment variables

JAVA_HOME="/usr/jdk64/jdk1.8.0_60/"
SPARK_YARN_MODE=true
HDP_VERSION="2.4.0.0-169" 

Then we need to tell Spark which files to distribute across all Spark executors. Therefor we need to set 4 variables. One variable is of format "<hdfs path1>#<cache name 1>,<hdfs path2>#<cache name 2>, ...", and the three others contain comma separated timestamps, file sizes and visbility of each file (same order):

SPARK_YARN_CACHE_FILES: "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar#__app__.jar,hdfs://<<name-node>>:8020/hdp/apps/2.4.0.0-169/spark/spark-hdp-assembly.jar#__spark__.jar"
SPARK_YARN_CACHE_FILES_FILE_SIZES: "10588,191724610"
SPARK_YARN_CACHE_FILES_TIME_STAMPS: "1460990579987,1460219553714"
SPARK_YARN_CACHE_FILES_VISIBILITIES: "PUBLIC,PRIVATE"

Replace <<name-node>> with the correct address. File size and timestamp can be retrieved from HDFS vie WebHDFS.

Next, construct the classpath

CLASSPATH="{{PWD}}<CPS>__spark__.jar<CPS>{{PWD}}/__app__.jar<CPS>{{PWD}}/__app__.properties<CPS>{{HADOOP_CONF_DIR}}<CPS>/usr/hdp/current/hadoop-client/*<CPS>/usr/hdp/current/hadoop-client/lib/*<CPS>/usr/hdp/current/hadoop-hdfs-client/*<CPS>/usr/hdp/current/hadoop-hdfs-client/lib/*<CPS>/usr/hdp/current/hadoop-yarn-client/*<CPS>/usr/hdp/current/hadoop-yarn-client/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/tools/lib/*<CPS>/usr/hdp/2.4.0.0-169/hadoop/lib/hadoop-lzo-0.6.0.2.4.0.0-169.jar<CPS>/etc/hadoop/conf/secure<CPS>"

Notes: - __spark__.jar and __app__.jar are the same as provided in SPARK_YARN_CACHE_FILES

- Spark will resolve <CPS> to `:`

c) Create the Spark job json file

The information above will be added to the Spark json file as the command and environment attribute (details see attachment - remove the .txt ending)

The last missing piece are the so called local_resources which describes all files in HDFS necessary for the Spark job: - Spark assembly jar (as in the caching environment variable) - Spark application jar for this project (as in the caching environment variable) - Spark properties file for this project (only for Application Master, no caching necessary)

All three need to be given in a form

{
  "key": "__app__.jar", 
  "value": {
    "resource": "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar", 
    "size": 10588, 
    "timestamp": 1460990579987, 
    "type": "FILE", 
    "visibility": "APPLICATION"
  }
}, 

Again, replace <<name-node>>. Timestamp, hdfs path, size and key need to be the same as for the caching environment variables.

Save it as spark-yarn.json (details see attachment - remove the .txt ending)

5) Submit the job

First request an application ID from YARN

curl -s -X POST -d '' \
     https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application
# {
#   "application-id": "application_1460195242962_0054",
#   "maximum-resource-capability": {
#     "memory": 8192,
#     "vCores": 3
#   } 
# }

Edit the "application-id" in spark-yarn.json and then submit the job:

curl -s -i -X POST -H "Content-Type: application/json" ${HADOOP_RM}/ws/v1/cluster/apps \
     --data-binary spark-yarn.json 
# HTTP/1.1 100 Continue
# 
# HTTP/1.1 202 Accepted
# Cache-Control: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Content-Type: application/json
# Location: http://<<resource-manager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054
# Content-Length: 0
# Server: Jetty(6.1.26.hwx)

6) Track the job

curl -s "http://<<resource-manager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054"
# {
#   "app": {
#     "id": "application_1460195242962_0054",
#     "user": "dr.who",
#     "name": "IrisApp",
#     "queue": "default",
#     "state": "FINISHED",
#     "finalStatus": "SUCCEEDED",
#     "progress": 100,
#     "trackingUI": "History",
#     "trackingUrl": "http://<<ResourceManager>>:8088/proxy/application_1460195242962_0054/",
#     "diagnostics": "",
#     "clusterId": 1460195242962,
#     "applicationType": "YARN",
#     "applicationTags": "",
#     "startedTime": 1460293367576,
#     "finishedTime": 1460293413568,
#     "elapsedTime": 45992,
#     "amContainerLogs": "http://<<node-manager>>:8042/node/containerlogs/container_e29_1460195242962_0054_01_000001/dr.who",
#     "amHostHttpAddress": "<<node-manager>>:8042",
#     "allocatedMB": -1,
#     "allocatedVCores": -1,
#     "runningContainers": -1,
#     "memorySeconds": 172346,
#     "vcoreSeconds": 112,
#     "queueUsagePercentage": 0,
#     "clusterUsagePercentage": 0,
#     "preemptedResourceMB": 0,
#     "preemptedResourceVCores": 0,
#     "numNonAMContainerPreempted": 0,
#     "numAMContainerPreempted": 0,
#     "logAggregationStatus": "SUCCEEDED"
#   }
# }

7) Using Knox (without kerberos)

The whole process works with Knox, just replace the WebHdfs and Resource Manager URLs with Knox substitutes:

a) Resource Manager:

http://<<resource-manager>>:8088/ws/v1 ==> https://<<knox-gateway>>:8443/gateway/default/resourcemanager/v1

b) Webhdfs Host

http://<<webhdfs-host>>:50070/webhdfs/v1 ==> https://<<knox-gateway>>:8443/gateway/default/webhdfs/v1

Additionally you need to provide Knox credentials (e.g. Basic Authentication <<user>:<<password>>)

😎 More details

More details and a python script to ease the whole process can be found in Spark-Yarn-REST-API Repo

Any comment to make this process easier is highly appreciated ...

35,697 Views
Comments
avatar
New Contributor

When I do the above - my job fails with permission error "Permission denied: user=myuser, access=EXECUTE, inode="/jobs/logs/dr.who/logs":dr.who:hadoop:drwxrwx---". My cluster is not secure but has dfs.permissions = true and I would prefer to not disable it. Every time the job runs from API it tries to run as user dr.who and that user does not exists in reality. When I run jobs with spark-submit or hadoop jar they run with logged in user. I understand that "dr.who" is a http static user - but how do I submit the job as some other "valid" user supplied in the API? The YARN API documentation is ambiguous as hell, I mean what in the world does this line even mean to a non-ops person- “Please note that in order to submit an app, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an “UNAUTHORIZED” response.” Does this mean that - the REST API can only be used by Java or JVM based client? (HttpServeletRequest is a Java class)... So please help me with how one can get around or solve the user running the application not being "dr.who"?

avatar
Contributor

@Atul Kulkarni There's little point in using dfs.permissions without security. I suspect you can setup a user-name/password authentication filter for YARN-RM and simply add the username/password to the curl-request using -u username and enter the password interactively. If you use such a simple filter, you should also probably use SSL, otherwise the password can be easily intercepted.

I still stand by my original comment, that without security enabled, dfs.permissions are basically ineffective, except in annoying cases like this. Much better to enable Kerberization and SPNEGO, and have security that works transparently and without passwords.

avatar
Contributor

How different is it from using livy to do the same?

avatar
Contributor

The default installation with Spark2 is providing a TGZ instead of a jar in a similarly named folder/location in hdfs - would that work as well as value for __spark__.jar, or is the jar still a requirement even with Spark2?

avatar
Contributor

There's two main differences:

1) Livy will run the job in one of its SparkSessions, whereas by submitting to Yarn, you are creating a stand-alone Spark-Job. The former will be slightly quicker to come on-line, but you may not be able to use all the features of the ressource manager, with regards to using all available ressources.

2) As far as I can tell Livy cannot yet do what Knox does, with regards to bridging authentication realms. Knox allows you to authenticate against the Cluster's security context using username and password, and then internally use cryptographic authentication (i.e. Kerberos). With Livy, LDAP authentication is still an open Pull Request, but I expect you could use Kerberos/SPNEGO authentication. For this, your workstation/CI-server would have to be in the same Kerberos Realm as your cluster, which often isn't the case.

Livy is nicer, if you have small jobs that you want to deploy with low latency and high modularity, Knox+Yarn is convenient, when you're outside the cluster, and have long running jobs. You can combine both: https://community.hortonworks.com/articles/70499/adding-livy-server-as-service-to-apache-knox.html