Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

RandomForest causing aborting spark session

avatar
New Contributor

I am trying to train a random forest on a dataset containing around 1 Million records and 218 variables (size 70 MB), with a split train (70%) test (30%) and save the quality metrics calculated on the test set (accuracy and f1 score).  

I managed to run the random forest if I set the number of trees on a low level, use not very deep trees or only use a sample of the data, but as soon as I try to run a randomforest with 150 trees with a higher maxdepth (depth of the tree) on the entire train set, I get an error: Py4JJavaError: An error occurred while calling o1945.evaluate.: java.lang.OutOfMemoryError

This error seems to be a problem related to the driver memory: in the spark web ui there seems to be no problem, no failed job, just it stopped to run the next jobs and my spark session in the cdsw can no longer communicate with the cluster and the spark application. 

And when I am running the randomforest with the same parameters on a 10% sample of the data, it works but it takes around 20 minutes, which surprises me as it is very slow in comparison with an implementation with a normal r library (ranger) with a normal r server: it takes around 15 minutes to run a random forest with 150 trees and unlimited depth (default parameter with ranger) on the entire dataset; it takes less than one minute to run over the 10% sample. This using a server with 20 cores and up to 800 GB memory. 

I tried several strategies to improve the performance with pyspark: 

- play with the parameter of the spark application: increase the number of executors, increase the number of cores/memory of the executors. I just discovered that it was crucial to give enough memory for the executors, otherwise I could encounter memory problems. 

- play with the number of partitions, but increasing the number didn't seem to improve anything. It was better to let the number of partitions around 8 because otherwise the jobs would just run more slowly and eventually the spark session would abort. 

- play with maxpointintervall

- play with the specifications of the driver: i increased greatly the memory of the driver until around 30 GB and increased the maxresultsize of the driver to infinity (1). It helped to achieve the training of the randomforest on the 10%sample but it was apparently not enough for the entire sample. 

What am I doing wrong? Does it mean that the spark implementation of the random forest is very driver intensiv? But that would mean there is little advantage to use spark for this task?

I am currently using spark version 2.4.7.7.1.7.2000-305 and I work with the cdsw. 

Here is my code: 

from pyspark.sql import SparkSession

from pyspark.sql import Catalog

from pyspark.sql import SQLContext

 

from pyspark.sql.functions import *

from pyspark.ml.feature import Tokenizer,StringIndexer,StopWordsRemover,CountVectorizer,IDF, RegexTokenizer

from pyspark.ml.feature import HashingTF

 

from pyspark.ml.classification import RandomForestClassifier

 

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

 

import time

 

############################### Settings ###############################

 

# Paths:

path_processed = "/datalake/arbeitsbereich/bigdata/verarbeitet/"

 

# Version Pre-processing:

version_preprocessing = "_v1"

# "_v1": tokenize as word, no ngram, hashingTF, IDF, no add variables

# "_v2": tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables

# "_subsample":only subsample from R with split, tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables

# "_subsample_v5": only subsample from R with split and with sea_final_str, tokenize as char with 3grams and 4grams, hashingTF, no add variables

# Version model

maxDepth = 30

version_model = "_v1"

 

type_model = "model_rf"+ version_model

model_name = type_model + "_preprocessing" + version_preprocessing

 

comment_preprocessing = "Tokenize as char with 3grams and 4grams, hashingTF with 700 variables, add variables"

comment_model = "RF with pyspark library RandomForest"

comment_techn = "dynamic allocation, kryoserializer with 512m"

size_sample= 1

 

# Parameter Spark session

minExecutors = "2"

maxExecutors = "6"

driver_mem = "28g"

driver_cores = "4"

driver_maxresultsize = "0"

exec_memory = "4g"

num_partition = 5

# Parameter Randomforest

maxDepth = 30

numTrees = 150

featureSubsetStrategy = 'sqrt'

subsamplingRate = 0.8

maxMemoryInMB = 725

checkpointInterval = 10

cacheNodeIds = True

 

db_name_output = "db_bigdata_evs_processed"

table_name_all = "dat_clean_m1_all" + version_preprocessing

table_name_train = "dat_clean_m1_train"+ version_preprocessing

table_name_test = "dat_clean_m1_test" + version_preprocessing

db_table_all = ".".join([db_name_output, table_name_all])

db_table_train = ".".join([db_name_output, table_name_train])

db_table_test = ".".join([db_name_output, table_name_test])

 

spark = SparkSession.builder.appName('EVS train rf')\

.config("spark.sql.repl.eagerEval.enabled", True)\

.config("spark.dynamicAllocation.enabled", "true")\

.config("spark.dynamicAllocation.minExecutors", minExecutors)\

.config("spark.dynamicAllocation.maxExecutors", maxExecutors)\

.config("spark.driver.memory", driver_mem)\

.config("spark.driver.cores", driver_cores)\

.config("spark.executor.memory", exec_memory)\

.config("spark.driver.maxResultSize", driver_maxresultsize)\

.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\

.config("spark.kryoserializer.buffer.max", "512m")\

.getOrCreate()

 

spark.sparkContext.setCheckpointDir('/user/lestrade-a')

 

# Begin timer

tic = time.perf_counter()

 

############################### Read data ###############################

 

# Load data from hive

train = spark.read.table(f"{db_table_train}").repartition(num_partition)

test = spark.read.table(f"{db_table_test}").repartition(num_partition)

 

#train = train.sample(fraction=size_sample, seed=3)

#test = test.sample(fraction=size_sample, seed=3)

 

############################### Train model ###############################

 

rf = RandomForestClassifier(featuresCol = 'features_final',

labelCol = 'label',

numTrees = numTrees,

maxDepth = maxDepth,

subsamplingRate = subsamplingRate,

maxMemoryInMB = maxMemoryInMB,

checkpointInterval = checkpointInterval,

cacheNodeIds = cacheNodeIds,

featureSubsetStrategy = featureSubsetStrategy)

rf_fit = rf.fit(train)

 

predictions = rf_fit.transform(test).persist()

 

# Accuracy check

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",

labelCol='label', metricName="accuracy")

acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

#print(acc)

f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

#print(f1)

 

# End timer:

toc = time.perf_counter()

 

# Format monitoring

monitore = toc-tic

monitore= time.strftime("%Hh%Mm%Ss", time.gmtime(monitore))

 

# Format logs

columns = ["laufzeit",

"accuracy",

"f1_score",

"model_name",

"comment_preprocessing",

"comment_model",

"comment_techn",

"size_sample",

"minExecutors",

"maxExecutors",

"driver_mem",

"driver_cores",

"driver_maxresultsize",

"exec_memory",

"maxDepth",

"numTrees",

"featureSubsetStrategy",

"subsamplingRate",

"maxMemoryInMB",

"checkpointInterval",

"cacheNodeIds",

"num_partition",]

log_data = [(monitore,

acc,

f1,

model_name,

comment_preprocessing,

comment_model,

comment_techn,

size_sample,

minExecutors,

maxExecutors,

driver_mem,

driver_cores,

driver_maxresultsize,

exec_memory,

maxDepth,

numTrees,

featureSubsetStrategy,

subsamplingRate,

maxMemoryInMB,

checkpointInterval,

cacheNodeIds,

num_partition,)]

logs = spark.sparkContext.parallelize(log_data).toDF(columns)

 

# Save the logs

db_logs = "db_bigdata_evs_logs"

table_logs = "logs_m1_" + model_name

db_table_logs = ".".join([db_logs, table_logs])

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_logs} LOCATION \

'/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'")

logs.write.mode("overwrite")\

.saveAsTable(db_table_logs,path = '/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'+db_logs+"/"+table_logs)

1 REPLY 1

avatar
Community Manager

@loulou1601 Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our Spark experts @Bharati @jagadeesan  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: