Member since
11-02-2023
1
Post
0
Kudos Received
0
Solutions
11-02-2023
07:39 AM
I am trying to train a random forest on a dataset containing around 1 Million records and 218 variables (size 70 MB), with a split train (70%) test (30%) and save the quality metrics calculated on the test set (accuracy and f1 score). I managed to run the random forest if I set the number of trees on a low level, use not very deep trees or only use a sample of the data, but as soon as I try to run a randomforest with 150 trees with a higher maxdepth (depth of the tree) on the entire train set, I get an error: Py4JJavaError: An error occurred while calling o1945.evaluate.: java.lang.OutOfMemoryError This error seems to be a problem related to the driver memory: in the spark web ui there seems to be no problem, no failed job, just it stopped to run the next jobs and my spark session in the cdsw can no longer communicate with the cluster and the spark application. And when I am running the randomforest with the same parameters on a 10% sample of the data, it works but it takes around 20 minutes, which surprises me as it is very slow in comparison with an implementation with a normal r library (ranger) with a normal r server: it takes around 15 minutes to run a random forest with 150 trees and unlimited depth (default parameter with ranger) on the entire dataset; it takes less than one minute to run over the 10% sample. This using a server with 20 cores and up to 800 GB memory. I tried several strategies to improve the performance with pyspark: - play with the parameter of the spark application: increase the number of executors, increase the number of cores/memory of the executors. I just discovered that it was crucial to give enough memory for the executors, otherwise I could encounter memory problems. - play with the number of partitions, but increasing the number didn't seem to improve anything. It was better to let the number of partitions around 8 because otherwise the jobs would just run more slowly and eventually the spark session would abort. - play with maxpointintervall - play with the specifications of the driver: i increased greatly the memory of the driver until around 30 GB and increased the maxresultsize of the driver to infinity (1). It helped to achieve the training of the randomforest on the 10%sample but it was apparently not enough for the entire sample. What am I doing wrong? Does it mean that the spark implementation of the random forest is very driver intensiv? But that would mean there is little advantage to use spark for this task? I am currently using spark version 2.4.7.7.1.7.2000-305 and I work with the cdsw. Here is my code: from pyspark.sql import SparkSession from pyspark.sql import Catalog from pyspark.sql import SQLContext from pyspark.sql.functions import * from pyspark.ml.feature import Tokenizer,StringIndexer,StopWordsRemover,CountVectorizer,IDF, RegexTokenizer from pyspark.ml.feature import HashingTF from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator import time ############################### Settings ############################### # Paths: path_processed = "/datalake/arbeitsbereich/bigdata/verarbeitet/" # Version Pre-processing: version_preprocessing = "_v1" # "_v1": tokenize as word, no ngram, hashingTF, IDF, no add variables # "_v2": tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables # "_subsample":only subsample from R with split, tokenize as char with 3grams and 4grams, hashingTF, IDF, no add variables # "_subsample_v5": only subsample from R with split and with sea_final_str, tokenize as char with 3grams and 4grams, hashingTF, no add variables # Version model maxDepth = 30 version_model = "_v1" type_model = "model_rf"+ version_model model_name = type_model + "_preprocessing" + version_preprocessing comment_preprocessing = "Tokenize as char with 3grams and 4grams, hashingTF with 700 variables, add variables" comment_model = "RF with pyspark library RandomForest" comment_techn = "dynamic allocation, kryoserializer with 512m" size_sample= 1 # Parameter Spark session minExecutors = "2" maxExecutors = "6" driver_mem = "28g" driver_cores = "4" driver_maxresultsize = "0" exec_memory = "4g" num_partition = 5 # Parameter Randomforest maxDepth = 30 numTrees = 150 featureSubsetStrategy = 'sqrt' subsamplingRate = 0.8 maxMemoryInMB = 725 checkpointInterval = 10 cacheNodeIds = True db_name_output = "db_bigdata_evs_processed" table_name_all = "dat_clean_m1_all" + version_preprocessing table_name_train = "dat_clean_m1_train"+ version_preprocessing table_name_test = "dat_clean_m1_test" + version_preprocessing db_table_all = ".".join([db_name_output, table_name_all]) db_table_train = ".".join([db_name_output, table_name_train]) db_table_test = ".".join([db_name_output, table_name_test]) spark = SparkSession.builder.appName('EVS train rf')\ .config("spark.sql.repl.eagerEval.enabled", True)\ .config("spark.dynamicAllocation.enabled", "true")\ .config("spark.dynamicAllocation.minExecutors", minExecutors)\ .config("spark.dynamicAllocation.maxExecutors", maxExecutors)\ .config("spark.driver.memory", driver_mem)\ .config("spark.driver.cores", driver_cores)\ .config("spark.executor.memory", exec_memory)\ .config("spark.driver.maxResultSize", driver_maxresultsize)\ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\ .config("spark.kryoserializer.buffer.max", "512m")\ .getOrCreate() spark.sparkContext.setCheckpointDir('/user/lestrade-a') # Begin timer tic = time.perf_counter() ############################### Read data ############################### # Load data from hive train = spark.read.table(f"{db_table_train}").repartition(num_partition) test = spark.read.table(f"{db_table_test}").repartition(num_partition) #train = train.sample(fraction=size_sample, seed=3) #test = test.sample(fraction=size_sample, seed=3) ############################### Train model ############################### rf = RandomForestClassifier(featuresCol = 'features_final', labelCol = 'label', numTrees = numTrees, maxDepth = maxDepth, subsamplingRate = subsamplingRate, maxMemoryInMB = maxMemoryInMB, checkpointInterval = checkpointInterval, cacheNodeIds = cacheNodeIds, featureSubsetStrategy = featureSubsetStrategy) rf_fit = rf.fit(train) predictions = rf_fit.transform(test).persist() # Accuracy check evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label', metricName="accuracy") acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}) #print(acc) f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"}) #print(f1) # End timer: toc = time.perf_counter() # Format monitoring monitore = toc-tic monitore= time.strftime("%Hh%Mm%Ss", time.gmtime(monitore)) # Format logs columns = ["laufzeit", "accuracy", "f1_score", "model_name", "comment_preprocessing", "comment_model", "comment_techn", "size_sample", "minExecutors", "maxExecutors", "driver_mem", "driver_cores", "driver_maxresultsize", "exec_memory", "maxDepth", "numTrees", "featureSubsetStrategy", "subsamplingRate", "maxMemoryInMB", "checkpointInterval", "cacheNodeIds", "num_partition",] log_data = [(monitore, acc, f1, model_name, comment_preprocessing, comment_model, comment_techn, size_sample, minExecutors, maxExecutors, driver_mem, driver_cores, driver_maxresultsize, exec_memory, maxDepth, numTrees, featureSubsetStrategy, subsamplingRate, maxMemoryInMB, checkpointInterval, cacheNodeIds, num_partition,)] logs = spark.sparkContext.parallelize(log_data).toDF(columns) # Save the logs db_logs = "db_bigdata_evs_logs" table_logs = "logs_m1_" + model_name db_table_logs = ".".join([db_logs, table_logs]) spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_logs} LOCATION \ '/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'") logs.write.mode("overwrite")\ .saveAsTable(db_table_logs,path = '/datalake/arbeitsbereich/bigdata/verarbeitet/logs/'+db_logs+"/"+table_logs)
... View more
Labels: