Support Questions
Find answers, ask questions, and share your expertise

Spark get slower in iteration process.

Explorer

Hello!

I am new to Spark (pyspark, to be concrete). In my task I have cached in memory dataframe of 300 million rows. On the other hand I have 215 files, each I can read into dataframe of 600k rows. I want to iteratively read file by file, find intersection by some field with cached dataframe and write result to another file.

First 10-12 iterates are being done pretty fast with equal time, 20-25 sec, I monitor them Spark UI. However execution time at next few iterates increases dramatically, 1-2 min and more. To be more concrete, at write operation. Finally, the whole job get stuck, so I have to kill it.

Could you please give me an idea, why this happens? Which resources I need to release (and how) to start each iteration from scratch?

3 REPLIES 3

Explorer
count = 0
ln = len(bkg_lst) 
start = 0
for i in bkg_lst[:]: # list contains paths to files
    part_bkg = sqlContext.read.option("delimiter", ";").format('com.databricks.spark.csv').options(header='false', inferschema='true').load(i)
    if len(part_bkg.head(1)) != 0:
        # filter out unnessesary columns
        part_bkg = part_bkg.select('BKG_ID', 'PNR_ID', 'PAX_ID', 'SEG_NBR', 'PNR_CRTD_DT', 'OPRTNG_FLT_DT_UTC', 'TRAVELER_ORIG_TERMINAL', 'TRAVELER_DEST_TERMINAL', 'DEL_FLG', 'SRC_SYS_CRTD_TMSTMP')
         part_bkg.createOrReplaceTempView("bkg")        
	# here goes nessesary SQL select, not heavy for sure
        res_df = sqlContext.sql("""select b.ucp_id from ...""")                 
        if len(res_df.head(1)) != 0:                         
             res_df.select('UCP_ID', 'TRAVELER_DEST_TERMINAL').intersect(df_rec.select('UCP_ID','ITEM_KEY')).select('UCP_ID').coalesce(1).write.save(path='/tmp/true_recs/' +str(count) , format='csv', mode='append', sep=',')
    count +=1
    print(count, " of ", ln)<br>

@Stanislav Nikitin did you checked the GC time in spark UI?

Explorer

@Felix Albani

Thank you for answering! Yes, I were monitoring the progress in Spark web UI. The amount of job did not grow from iteration to iteration, while the time spending on these jobs did.

To be honest, I managed my problem. I just read the whole file and successfully joined it with another. However, I want to know, why "for"-loops are so ineffective in terms of Spark.