Support Questions

Find answers, ask questions, and share your expertise

Spark not using Yarn cluster resources

avatar

I'm trying to run a Python script using Spark (1.6.1) on an Hadoop cluster (2.4.2). The cluster was installed, configured and managed using Ambari (2.2.1.1). I have a cluster of 4 nodes (each 40Gb HD-8 cores-16Gb RAM).

My script uses `sklearn` lib: so in order to parallelize it on spark I use `spark_sklearn` lib (see it on spark-sklearn-link).

At this point I tried to run the script with:

spark-submit spark_example.py --master yarn --deploy-mode client --num-executors 8 --num-executor-core 4 --executor-memory 2G 

but it runs always on localhost with only one executor.

4171-screenshot-2016-05-11-115329.png

Also from Ambari dashboard I can see that only one node of the cluster is resource-consuming. And also trying different configuration (executors, cores) the execution time is the same.

This is Yarn UI Nodes screenshot:

4170-screenshot-2016-05-11-120946.png

Any ideas? Thanks a lot

1 ACCEPTED SOLUTION

avatar
Super Guru

@Pietro Fragnito

You may also need to check your spark-env.sh file and make sure that MASTER=yarn-client variable is set.

View solution in original post

8 REPLIES 8

avatar
Super Guru

@Pietro Fragnito

You may also need to check your spark-env.sh file and make sure that MASTER=yarn-client variable is set.

avatar

Where can I find this file? Thanks a lot

avatar

Ok I found it, but there isn't this parameter.

avatar
Super Guru

OK, then try to set that parameter and run again?

avatar

Ok Thanks! Seems adding this param works for me.

#!/usr/bin/env bash

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

MASTER="yarn-cluster"

# Options read in YARN client mode
SPARK_EXECUTOR_INSTANCES="3" #Number of workers to start (Default: 2)
#SPARK_EXECUTOR_CORES="1" #Number of cores for the workers (Default: 1).
#SPARK_EXECUTOR_MEMORY="1G" #Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
#SPARK_DRIVER_MEMORY="512 Mb" #Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
#SPARK_YARN_APP_NAME="spark" #The name of your application (Default: Spark)
#SPARK_YARN_QUEUE="~@~Xdefault~@~Y" #The hadoop queue to use for allocation requests (Default: @~Xdefault~@~Y)
#SPARK_YARN_DIST_FILES="" #Comma separated list of files to be distributed with the job.
#SPARK_YARN_DIST_ARCHIVES="" #Comma separated list of archives to be distributed with the job.


# Generic options for the daemons used in the standalone deploy mode


# Alternate conf dir. (Default: ${SPARK_HOME}/conf)
export SPARK_CONF_DIR=${SPARK_CONF_DIR:-{{spark_home}}/conf}


# Where log files are stored.(Default:${SPARK_HOME}/logs)
#export SPARK_LOG_DIR=${SPARK_HOME:-{{spark_home}}}/logs
export SPARK_LOG_DIR={{spark_log_dir}}


# Where the pid file is stored. (Default: /tmp)
export SPARK_PID_DIR={{spark_pid_dir}}


# A string representing this instance of spark.(Default: $USER)
SPARK_IDENT_STRING=$USER


# The scheduling priority for daemons. (Default: 0)
SPARK_NICENESS=0


export HADOOP_HOME=${HADOOP_HOME:-{{hadoop_home}}}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-{{hadoop_conf_dir}}}


# The java implementation to use.
export JAVA_HOME={{java_home}}


if [ -d "/etc/tez/conf/" ]; then
  export TEZ_CONF_DIR=/etc/tez/conf
else
  export TEZ_CONF_DIR=
fi

ps:it works well but seems the params passed via command line (e.g.: --num-executors 8--num-executor-core 4--executor-memory 2G) are not taken in consideration. Instead, if I set the executors in "spark-env template" filed of Ambari, the params are taken in consideration. Anyway now it works 🙂

Thanks a lot.

avatar
Cloudera Employee

Can you check if somewhere in spark-example.py or any spark-default.conf is overriding the master and deploy-mode properties

avatar

This is spark-default.conf

# Generated by Apache Ambari. Wed May 11 10:32:59 2016

spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.kerberos.keytab none
spark.history.kerberos.principal none
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 384
spark.yarn.executor.memoryOverhead 384
spark.yarn.historyServer.address tesi-vm-3.cloud.ba.infn.it:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3

And this is my spark-example.py

from pyspark import SparkContext

import numpy as np
import pandas as pd
from sklearn import grid_search, datasets
from sklearn.svm import SVR
from spark_sklearn import GridSearchCV
import sklearn

import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib
matplotlib.use('Agg')
matplotlib.style.use('ggplot')
import time
import StringIO

sc = SparkContext(appName="PythonPi")

def show(p):
  img = StringIO.StringIO()
  p.savefig(img, format='svg')
  img.seek(0)
  print "%html <div style='width:1200px'>" + img.buf + "</div>"

#hourlyElectricity = pd.read_excel('hdfs:///dataset/building_6_ALL_hourly.xlsx')
hourlyElectricity = pd.read_excel('/dataset/building_6_ALL_hourly.xlsx')

#display one dataframe
print hourlyElectricity.head()

hourlyElectricity = hourlyElectricity.set_index(['Data'])
hourlyElectricity.index.name = None
print hourlyElectricity.head()

def addHourlyTimeFeatures(df):
    df['hour'] = df.Ora
    df['weekday'] = df.index.weekday
    df['day'] = df.index.dayofyear
    df['week'] = df.index.weekofyear    
    return df

hourlyElectricity = addHourlyTimeFeatures(hourlyElectricity)
print hourlyElectricity.head()
df_hourlyelect = hourlyElectricity[['hour', 'weekday', 'day', 'week', 'CosHour', 'Occupancy', 'Power']]

hourlyelect_train = pd.DataFrame(data=df_hourlyelect, index=np.arange('2011-01-01 00:00:00', '2011-10-01 00:00:00', dtype='datetime64[h]')).dropna()
hourlyelect_test = pd.DataFrame(data=df_hourlyelect, index=np.arange('2011-10-01 00:00:00', '2011-11-01 00:00:00', dtype='datetime64[h]')).dropna()

XX_hourlyelect_train = hourlyelect_train.drop('Power', axis = 1).reset_index().drop('index', axis = 1)

XX_hourlyelect_test = hourlyelect_test.drop('Power', axis = 1).reset_index().drop('index', axis = 1)

YY_hourlyelect_train = hourlyelect_train['Power']
YY_hourlyelect_test = hourlyelect_test['Power']

# Optimal parameters for the SVR regressor
gamma_range = [0.001,0.0001,0.00001,0.000001,0.0000001]
epsilon_range = [x * 0.1 for x in range(0, 2)]
C_range = range(3000, 8000, 500)

tuned_parameters = {
    'kernel': ['rbf']
    ,'C': C_range
    ,'gamma': gamma_range
    ,'epsilon': epsilon_range
    }

#start monitoring execution time
start_time = time.time()

# search for the best parameters with crossvalidation.

#svr_hourlyelect = GridSearchCV(SVR(C=5000, epsilon=0.0, gamma=1e-05), param_grid = tuned_parameters, verbose = 0)

svr_hourlyelect = GridSearchCV(sc,SVR(), param_grid = tuned_parameters, verbose = 0)

# Fit regression model
y_hourlyelect = svr_hourlyelect.fit(XX_hourlyelect_train, YY_hourlyelect_train).predict(XX_hourlyelect_test)

print("--- %s minutes ---" % ((time.time() - start_time)/60))
print 'Optimum epsilon and kernel for SVR: ', svr_hourlyelect.best_params_
print "The test score R2: ", svr_hourlyelect.score(XX_hourlyelect_test, YY_hourlyelect_test)

print("SVR mean squared error: %.2f" % np.mean((YY_hourlyelect_test - svr_hourlyelect.predict(XX_hourlyelect_test)) ** 2))

I think there aren't overridings

avatar
Master Guru

http://spark.apache.org/docs/latest/submitting-applications.html

--deploy-mode cluster

export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000