Created on 04-18-2016 05:20 PM
There are situations, when one might want to submit a Spark job via a REST API:
One possibility is to use the Oozie REST API and the Oozie Spark action,
However, this article looks into the option of using the YARN REST API directly. Starting with the Cluster Applications API I tried to come up with an approach that resembles the spark-submit command.
Per default the spark assembly jar file is not available in HDFS. For remote access we will need it.
Some standard locations in HDP are:
This is a one time preparation step, for example for HDP 2.4 it would be:
sudo su - hdfs HDP_VERSION=2.4.0.0-169 SPARK_JAR=spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar hdfs dfs -mkdir "/hdp/apps/${HDP_VERSION}/spark/" hdfs dfs -put "/usr/hdp/${HDP_VERSION}/spark/lib/$SPARK_JAR" "/hdp/apps/${HDP_VERSION}/spark/spark-hdp-assembly.jar"
Upload your spark application jar file packaged by sbt to the project folder in HDFS via WebHdfs (maybe use something better than "/tmp"):
export APP_FILE=simple-project_2.10-1.0.jar curl -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project?op=MKDIRS" curl -i -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project/${APP_FILE}?op=CREATE&overwrite=true" # take Location header from the response and issue a PUT request LOCATION="http://..." curl -i -X PUT -T "target/scala-2.10/${APP_FILE}" "${LOCATION}"
spark.yarn.submit.file.replication=3 spark.yarn.executor.memoryOverhead=384 spark.yarn.driver.memoryOverhead=384 spark.master=yarn spark.submit.deployMode=cluster spark.eventLog.enabled=true spark.yarn.scheduler.heartbeat.interval-ms=5000 spark.yarn.preserve.staging.files=true spark.yarn.queue=default spark.yarn.containerLauncherMaxThreads=25 spark.yarn.max.executor.failures=3 spark.executor.instances=2 spark.eventLog.dir=hdfs\:///spark-history spark.history.kerberos.enabled=true spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider spark.history.ui.port=18080 spark.history.fs.logDirectory=hdfs\:///spark-history spark.executor.memory=2G spark.executor.cores=2 spark.history.kerberos.keytab=none spark.history.kerberos.principal=none
and upload it via WebHDFS as spark-yarn.properties to your simple-project folder as before
java -server -Xmx1024m -Dhdp.version=2.4.0.0-169 \ -Dspark.yarn.app.container.log.dir=/hadoop/yarn/log/rest-api \ -Dspark.app.name=SimpleProject \ org.apache.spark.deploy.yarn.ApplicationMaster \ --class IrisApp --jar __app__.jar \ --arg '--class' --arg 'SimpleProject' \ 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
It is important to provide the Spark Application Name and the HDP Version. Spark will resolve <LOG_DIR>
JAVA_HOME="/usr/jdk64/jdk1.8.0_60/" SPARK_YARN_MODE=true HDP_VERSION="2.4.0.0-169"
Then we need to tell Spark which files to distribute across all Spark executors. Therefor we need to set 4 variables. One variable is of format "<hdfs path1>#<cache name 1>,<hdfs path2>#<cache name 2>, ...", and the three others contain comma separated timestamps, file sizes and visbility of each file (same order):
SPARK_YARN_CACHE_FILES: "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar#__app__.jar,hdfs://<<name-node>>:8020/hdp/apps/2.4.0.0-169/spark/spark-hdp-assembly.jar#__spark__.jar" SPARK_YARN_CACHE_FILES_FILE_SIZES: "10588,191724610" SPARK_YARN_CACHE_FILES_TIME_STAMPS: "1460990579987,1460219553714" SPARK_YARN_CACHE_FILES_VISIBILITIES: "PUBLIC,PRIVATE"
Replace <<name-node>> with the correct address. File size and timestamp can be retrieved from HDFS vie WebHDFS.
Next, construct the classpath
CLASSPATH="{{PWD}}<CPS>__spark__.jar<CPS>{{PWD}}/__app__.jar<CPS>{{PWD}}/__app__.properties<CPS>{{HADOOP_CONF_DIR}}<CPS>/usr/hdp/current/hadoop-client/*<CPS>/usr/hdp/current/hadoop-client/lib/*<CPS>/usr/hdp/current/hadoop-hdfs-client/*<CPS>/usr/hdp/current/hadoop-hdfs-client/lib/*<CPS>/usr/hdp/current/hadoop-yarn-client/*<CPS>/usr/hdp/current/hadoop-yarn-client/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/tools/lib/*<CPS>/usr/hdp/2.4.0.0-169/hadoop/lib/hadoop-lzo-0.6.0.2.4.0.0-169.jar<CPS>/etc/hadoop/conf/secure<CPS>"
Notes: - __spark__.jar and __app__.jar are the same as provided in SPARK_YARN_CACHE_FILES
- Spark will resolve <CPS> to `:`
The information above will be added to the Spark json file as the command and environment attribute (details see attachment - remove the .txt ending)
The last missing piece are the so called local_resources which describes all files in HDFS necessary for the Spark job: - Spark assembly jar (as in the caching environment variable) - Spark application jar for this project (as in the caching environment variable) - Spark properties file for this project (only for Application Master, no caching necessary)
All three need to be given in a form
{ "key": "__app__.jar", "value": { "resource": "hdfs://<<name-node>>:8020/tmp/simple-project/simple-project.jar", "size": 10588, "timestamp": 1460990579987, "type": "FILE", "visibility": "APPLICATION" } },
Again, replace <<name-node>>. Timestamp, hdfs path, size and key need to be the same as for the caching environment variables.
Save it as spark-yarn.json (details see attachment - remove the .txt ending)
First request an application ID from YARN
curl -s -X POST -d '' \ https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application # { # "application-id": "application_1460195242962_0054", # "maximum-resource-capability": { # "memory": 8192, # "vCores": 3 # } # }
Edit the "application-id" in spark-yarn.json and then submit the job:
curl -s -i -X POST -H "Content-Type: application/json" ${HADOOP_RM}/ws/v1/cluster/apps \ --data-binary spark-yarn.json # HTTP/1.1 100 Continue # # HTTP/1.1 202 Accepted # Cache-Control: no-cache # Expires: Sun, 10 Apr 2016 13:02:47 GMT # Date: Sun, 10 Apr 2016 13:02:47 GMT # Pragma: no-cache # Expires: Sun, 10 Apr 2016 13:02:47 GMT # Date: Sun, 10 Apr 2016 13:02:47 GMT # Pragma: no-cache # Content-Type: application/json # Location: http://<<resource-manager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054 # Content-Length: 0 # Server: Jetty(6.1.26.hwx)
curl -s "http://<<resource-manager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054" # { # "app": { # "id": "application_1460195242962_0054", # "user": "dr.who", # "name": "IrisApp", # "queue": "default", # "state": "FINISHED", # "finalStatus": "SUCCEEDED", # "progress": 100, # "trackingUI": "History", # "trackingUrl": "http://<<ResourceManager>>:8088/proxy/application_1460195242962_0054/", # "diagnostics": "", # "clusterId": 1460195242962, # "applicationType": "YARN", # "applicationTags": "", # "startedTime": 1460293367576, # "finishedTime": 1460293413568, # "elapsedTime": 45992, # "amContainerLogs": "http://<<node-manager>>:8042/node/containerlogs/container_e29_1460195242962_0054_01_000001/dr.who", # "amHostHttpAddress": "<<node-manager>>:8042", # "allocatedMB": -1, # "allocatedVCores": -1, # "runningContainers": -1, # "memorySeconds": 172346, # "vcoreSeconds": 112, # "queueUsagePercentage": 0, # "clusterUsagePercentage": 0, # "preemptedResourceMB": 0, # "preemptedResourceVCores": 0, # "numNonAMContainerPreempted": 0, # "numAMContainerPreempted": 0, # "logAggregationStatus": "SUCCEEDED" # } # }
The whole process works with Knox, just replace the WebHdfs and Resource Manager URLs with Knox substitutes:
a) Resource Manager:
http://<<resource-manager>>:8088/ws/v1 ==> https://<<knox-gateway>>:8443/gateway/default/resourcemanager/v1
b) Webhdfs Host
http://<<webhdfs-host>>:50070/webhdfs/v1 ==> https://<<knox-gateway>>:8443/gateway/default/webhdfs/v1
Additionally you need to provide Knox credentials (e.g. Basic Authentication <<user>:<<password>>)
More details and a python script to ease the whole process can be found in Spark-Yarn-REST-API Repo
Any comment to make this process easier is highly appreciated ...
Created on 10-28-2017 06:44 PM
When I do the above - my job fails with permission error "Permission denied: user=myuser, access=EXECUTE, inode="/jobs/logs/dr.who/logs":dr.who:hadoop:drwxrwx---". My cluster is not secure but has dfs.permissions = true and I would prefer to not disable it. Every time the job runs from API it tries to run as user dr.who and that user does not exists in reality. When I run jobs with spark-submit or hadoop jar they run with logged in user. I understand that "dr.who" is a http static user - but how do I submit the job as some other "valid" user supplied in the API? The YARN API documentation is ambiguous as hell, I mean what in the world does this line even mean to a non-ops person- “Please note that in order to submit an app, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an “UNAUTHORIZED” response.” Does this mean that - the REST API can only be used by Java or JVM based client? (HttpServeletRequest is a Java class)... So please help me with how one can get around or solve the user running the application not being "dr.who"?
Created on 01-26-2018 01:01 PM
@Atul Kulkarni There's little point in using dfs.permissions without security. I suspect you can setup a user-name/password authentication filter for YARN-RM and simply add the username/password to the curl-request using -u username and enter the password interactively. If you use such a simple filter, you should also probably use SSL, otherwise the password can be easily intercepted.
I still stand by my original comment, that without security enabled, dfs.permissions are basically ineffective, except in annoying cases like this. Much better to enable Kerberization and SPNEGO, and have security that works transparently and without passwords.
Created on 01-27-2018 10:28 AM
How different is it from using livy to do the same?
Created on 01-29-2018 03:19 PM
The default installation with Spark2 is providing a TGZ instead of a jar in a similarly named folder/location in hdfs - would that work as well as value for __spark__.jar, or is the jar still a requirement even with Spark2?
Created on 01-29-2018 03:31 PM
There's two main differences:
1) Livy will run the job in one of its SparkSessions, whereas by submitting to Yarn, you are creating a stand-alone Spark-Job. The former will be slightly quicker to come on-line, but you may not be able to use all the features of the ressource manager, with regards to using all available ressources.
2) As far as I can tell Livy cannot yet do what Knox does, with regards to bridging authentication realms. Knox allows you to authenticate against the Cluster's security context using username and password, and then internally use cryptographic authentication (i.e. Kerberos). With Livy, LDAP authentication is still an open Pull Request, but I expect you could use Kerberos/SPNEGO authentication. For this, your workstation/CI-server would have to be in the same Kerberos Realm as your cluster, which often isn't the case.
Livy is nicer, if you have small jobs that you want to deploy with low latency and high modularity, Knox+Yarn is convenient, when you're outside the cluster, and have long running jobs. You can combine both: https://community.hortonworks.com/articles/70499/adding-livy-server-as-service-to-apache-knox.html