Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark get slower in iteration process.

Spark get slower in iteration process.

Explorer

Hello!

I am new to Spark (pyspark, to be concrete). In my task I have cached in memory dataframe of 300 million rows. On the other hand I have 215 files, each I can read into dataframe of 600k rows. I want to iteratively read file by file, find intersection by some field with cached dataframe and write result to another file.

First 10-12 iterates are being done pretty fast with equal time, 20-25 sec, I monitor them Spark UI. However execution time at next few iterates increases dramatically, 1-2 min and more. To be more concrete, at write operation. Finally, the whole job get stuck, so I have to kill it.

Could you please give me an idea, why this happens? Which resources I need to release (and how) to start each iteration from scratch?

3 REPLIES 3

Re: Spark get slower in iteration process.

Explorer
count = 0
ln = len(bkg_lst) 
start = 0
for i in bkg_lst[:]: # list contains paths to files
    part_bkg = sqlContext.read.option("delimiter", ";").format('com.databricks.spark.csv').options(header='false', inferschema='true').load(i)
    if len(part_bkg.head(1)) != 0:
        # filter out unnessesary columns
        part_bkg = part_bkg.select('BKG_ID', 'PNR_ID', 'PAX_ID', 'SEG_NBR', 'PNR_CRTD_DT', 'OPRTNG_FLT_DT_UTC', 'TRAVELER_ORIG_TERMINAL', 'TRAVELER_DEST_TERMINAL', 'DEL_FLG', 'SRC_SYS_CRTD_TMSTMP')
         part_bkg.createOrReplaceTempView("bkg")        
	# here goes nessesary SQL select, not heavy for sure
        res_df = sqlContext.sql("""select b.ucp_id from ...""")                 
        if len(res_df.head(1)) != 0:                         
             res_df.select('UCP_ID', 'TRAVELER_DEST_TERMINAL').intersect(df_rec.select('UCP_ID','ITEM_KEY')).select('UCP_ID').coalesce(1).write.save(path='/tmp/true_recs/' +str(count) , format='csv', mode='append', sep=',')
    count +=1
    print(count, " of ", ln)<br>
Highlighted

Re: Spark get slower in iteration process.

@Stanislav Nikitin did you checked the GC time in spark UI?

Re: Spark get slower in iteration process.

Explorer

@Felix Albani

Thank you for answering! Yes, I were monitoring the progress in Spark web UI. The amount of job did not grow from iteration to iteration, while the time spending on these jobs did.

To be honest, I managed my problem. I just read the whole file and successfully joined it with another. However, I want to know, why "for"-loops are so ineffective in terms of Spark.

Don't have an account?
Coming from Hortonworks? Activate your account here