Created 11-02-2023 07:39 AM
I am trying to train a random forest on a dataset containing around 1 Million records and 218 variables (size 70 MB), with a split train (70%) test (30%) and save the quality metrics calculated on the test set (accuracy and f1 score).
I managed to run the random forest if I set the number of trees on a low level, use not very deep trees or only use a sample of the data, but as soon as I try to run a randomforest with 150 trees with a higher maxdepth (depth of the tree) on the entire train set, I get an error: Py4JJavaError: An error occurred while calling o1945.evaluate.: java.lang.OutOfMemoryError
This error seems to be a problem related to the driver memory: in the spark web ui there seems to be no problem, no failed job, just it stopped to run the next jobs and my spark session in the cdsw can no longer communicate with the cluster and the spark application.
And when I am running the randomforest with the same parameters on a 10% sample of the data, it works but it takes around 20 minutes, which surprises me as it is very slow in comparison with an implementation with a normal r library (ranger) with a normal r server: it takes around 15 minutes to run a random forest with 150 trees and unlimited depth (default parameter with ranger) on the entire dataset; it takes less than one minute to run over the 10% sample. This using a server with 20 cores and up to 800 GB memory.
I tried several strategies to improve the performance with pyspark:
- play with the parameter of the spark application: increase the number of executors, increase the number of cores/memory of the executors. I just discovered that it was crucial to give enough memory for the executors, otherwise I could encounter memory problems.
- play with the number of partitions, but increasing the number didn't seem to improve anything. It was better to let the number of partitions around 8 because otherwise the jobs would just run more slowly and eventually the spark session would abort.
- play with maxpointintervall
- play with the specifications of the driver: i increased greatly the memory of the driver until around 30 GB and increased the maxresultsize of the driver to infinity (1). It helped to achieve the training of the randomforest on the 10%sample but it was apparently not enough for the entire sample.
What am I doing wrong? Does it mean that the spark implementation of the random forest is very driver intensiv? But that would mean there is little advantage to use spark for this task?
I am currently using spark version 2.4.7.7.1.7.2000-305 and I work with the cdsw.
Here is my code:
from pyspark.sql import SparkSession
from pyspark.sql import Catalog
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer,StringIndexer,StopWordsRemover,CountVectorizer,IDF, RegexTokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time
############################### Settings ###############################
# Paths:
path_processed = "/datalake/arbeitsbereich/bigdata/verarbeitet/"
# Version Pre-processing:
version_preprocessing = "_v1"
# "_v1": tokenize as word, no ngram, hashingTF, IDF, no add variables
# "_v2": tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables
# "_subsample":only subsample from R with split, tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables
# "_subsample_v5": only subsample from R with split and with sea_final_str, tokenize as char with 3grams and 4grams, hashingTF, no add variables
# Version model
maxDepth = 30
version_model = "_v1"
type_model = "model_rf"+ version_model
model_name = type_model + "_preprocessing" + version_preprocessing
comment_preprocessing = "Tokenize as char with 3grams and 4grams, hashingTF with 700 variables, add variables"
comment_model = "RF with pyspark library RandomForest"
comment_techn = "dynamic allocation, kryoserializer with 512m"
size_sample= 1
# Parameter Spark session
minExecutors = "2"
maxExecutors = "6"
driver_mem = "28g"
driver_cores = "4"
driver_maxresultsize = "0"
exec_memory = "4g"
num_partition = 5
# Parameter Randomforest
maxDepth = 30
numTrees = 150
featureSubsetStrategy = 'sqrt'
subsamplingRate = 0.8
maxMemoryInMB = 725
checkpointInterval = 10
cacheNodeIds = True
db_name_output = "db_bigdata_evs_processed"
table_name_all = "dat_clean_m1_all" + version_preprocessing
table_name_train = "dat_clean_m1_train"+ version_preprocessing
table_name_test = "dat_clean_m1_test" + version_preprocessing
db_table_all = ".".join([db_name_output, table_name_all])
db_table_train = ".".join([db_name_output, table_name_train])
db_table_test = ".".join([db_name_output, table_name_test])
spark = SparkSession.builder.appName('EVS train rf')\
.config("spark.sql.repl.eagerEval.enabled", True)\
.config("spark.dynamicAllocation.enabled", "true")\
.config("spark.dynamicAllocation.minExecutors", minExecutors)\
.config("spark.dynamicAllocation.maxExecutors", maxExecutors)\
.config("spark.driver.memory", driver_mem)\
.config("spark.driver.cores", driver_cores)\
.config("spark.executor.memory", exec_memory)\
.config("spark.driver.maxResultSize", driver_maxresultsize)\
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
.config("spark.kryoserializer.buffer.max", "512m")\
.getOrCreate()
spark.sparkContext.setCheckpointDir('/user/lestrade-a')
# Begin timer
tic = time.perf_counter()
############################### Read data ###############################
# Load data from hive
train = spark.read.table(f"{db_table_train}").repartition(num_partition)
test = spark.read.table(f"{db_table_test}").repartition(num_partition)
#train = train.sample(fraction=size_sample, seed=3)
#test = test.sample(fraction=size_sample, seed=3)
############################### Train model ###############################
rf = RandomForestClassifier(featuresCol = 'features_final',
labelCol = 'label',
numTrees = numTrees,
maxDepth = maxDepth,
subsamplingRate = subsamplingRate,
maxMemoryInMB = maxMemoryInMB,
checkpointInterval = checkpointInterval,
cacheNodeIds = cacheNodeIds,
featureSubsetStrategy = featureSubsetStrategy)
rf_fit = rf.fit(train)
predictions = rf_fit.transform(test).persist()
# Accuracy check
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
labelCol='label', metricName="accuracy")
acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
#print(acc)
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
#print(f1)
# End timer:
toc = time.perf_counter()
# Format monitoring
monitore = toc-tic
monitore= time.strftime("%Hh%Mm%Ss", time.gmtime(monitore))
# Format logs
columns = ["laufzeit",
"accuracy",
"f1_score",
"model_name",
"comment_preprocessing",
"comment_model",
"comment_techn",
"size_sample",
"minExecutors",
"maxExecutors",
"driver_mem",
"driver_cores",
"driver_maxresultsize",
"exec_memory",
"maxDepth",
"numTrees",
"featureSubsetStrategy",
"subsamplingRate",
"maxMemoryInMB",
"checkpointInterval",
"cacheNodeIds",
"num_partition",]
log_data = [(monitore,
acc,
f1,
model_name,
comment_preprocessing,
comment_model,
comment_techn,
size_sample,
minExecutors,
maxExecutors,
driver_mem,
driver_cores,
driver_maxresultsize,
exec_memory,
maxDepth,
numTrees,
featureSubsetStrategy,
subsamplingRate,
maxMemoryInMB,
checkpointInterval,
cacheNodeIds,
num_partition,)]
logs = spark.sparkContext.parallelize(log_data).toDF(columns)
# Save the logs
db_logs = "db_bigdata_evs_logs"
table_logs = "logs_m1_" + model_name
db_table_logs = ".".join([db_logs, table_logs])
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_logs} LOCATION \
'/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'")
logs.write.mode("overwrite")\
.saveAsTable(db_table_logs,path = '/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'+db_logs+"/"+table_logs)
Created 11-02-2023 01:29 PM
@loulou1601 Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our Spark experts @Bharati @jagadeesan who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,